Skip to content

Commit

Permalink
Ark2.0 support Netty plugin (#653)
Browse files Browse the repository at this point in the history
* Ark support Netty plugin

* fix mvn issue

* fix pom issue

---------

Co-authored-by: Xuhao Wang <1023605039@qq.com>
Co-authored-by: leo james <leojames.googol@gmail.com>
Co-authored-by: yuanyuancin <yuancin@163.com>
  • Loading branch information
4 people committed Nov 21, 2023
1 parent 7c4361d commit 362f9a8
Show file tree
Hide file tree
Showing 9 changed files with 632 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<module>sofa-ark-parent</module>
<module>sofa-ark-plugin/config-ark-plugin</module>
<module>sofa-ark-plugin/web-ark-plugin</module>
<module>sofa-ark-plugin/netty-ark-plugin</module>
</modules>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
<artifactId>sofa-ark-springboot-starter</artifactId>

<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>1.0.22</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.ark.springboot;

import com.alipay.sofa.ark.springboot.condition.ConditionalOnArkEnabled;
import com.alipay.sofa.ark.springboot.processor.ArkEventHandlerProcessor;
import com.alipay.sofa.ark.springboot.processor.ArkServiceInjectProcessor;
import com.alipay.sofa.ark.springboot.web.ArkNettyReactiveWebServerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.web.reactive.ReactiveWebServerFactoryAutoConfiguration;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.embedded.netty.NettyRouteProvider;
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import reactor.netty.http.server.HttpServer;

import java.util.Collection;
import java.util.stream.Collectors;

@Configuration
@ConditionalOnArkEnabled
@AutoConfigureBefore(ReactiveWebServerFactoryAutoConfiguration.class)
public class ArkReactiveAutoConfigure {
@Bean
public static ArkServiceInjectProcessor serviceInjectProcessor() {
return new ArkServiceInjectProcessor();
}

@Bean
public static ArkEventHandlerProcessor arkEventHandlerProcessor() {
return new ArkEventHandlerProcessor();
}

@Configuration
@ConditionalOnMissingBean({ ReactiveWebServerFactory.class })
@ConditionalOnClass(value = { HttpServer.class }, name = { "com.alipay.sofa.ark.netty.ArkNettyIdentification" })
static class EmbeddedNetty {
EmbeddedNetty() {
}

@Bean
@ConditionalOnMissingBean
ReactorResourceFactory reactorServerResourceFactory() {
return new ReactorResourceFactory();
}

@Bean
NettyReactiveWebServerFactory nettyReactiveWebServerFactory(ReactorResourceFactory resourceFactory, ObjectProvider<NettyRouteProvider> routes, ObjectProvider<NettyServerCustomizer> serverCustomizers) {
NettyReactiveWebServerFactory serverFactory = new ArkNettyReactiveWebServerFactory();
serverFactory.setResourceFactory(resourceFactory);
routes.orderedStream().forEach((xva$0) -> {
serverFactory.addRouteProviders(new NettyRouteProvider[]{xva$0});
});
serverFactory.getServerCustomizers().addAll((Collection)serverCustomizers.orderedStream().collect(Collectors.toList()));
return serverFactory;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.ark.springboot.web;

import com.alipay.sofa.ark.spi.model.Biz;
import com.alipay.sofa.ark.spi.service.ArkInject;
import com.alipay.sofa.ark.spi.service.biz.BizManagerService;
import com.alipay.sofa.ark.spi.web.EmbeddedServerService;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.embedded.netty.NettyRouteProvider;
import org.springframework.boot.web.embedded.netty.NettyServerCustomizer;
import org.springframework.boot.web.embedded.netty.SslServerCustomizer;
import org.springframework.boot.web.server.WebServer;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.http.server.reactive.ContextPathCompositeHandler;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.LoopResources;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;

import static com.alipay.sofa.ark.spi.constant.Constants.ROOT_WEB_CONTEXT_PATH;

public class ArkNettyReactiveWebServerFactory extends NettyReactiveWebServerFactory {
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private Duration lifecycleTimeout;

private List<NettyRouteProvider> routeProviders = new ArrayList();
@ArkInject
private EmbeddedServerService embeddedNettyService;

@ArkInject
private BizManagerService bizManagerService;

private boolean useForwardHeaders;

private ReactorResourceFactory resourceFactory;

private int backgroundProcessorDelay;
private Set<NettyServerCustomizer> serverCustomizers = new LinkedHashSet();

public ArkNettyReactiveWebServerFactory() {
}

public ArkNettyReactiveWebServerFactory(int port) {
super(port);
}

@Override
public WebServer getWebServer(HttpHandler httpHandler) {
String contextPath = getContextPath();
Map<String, HttpHandler> handlerMap = new HashMap<>();
handlerMap.put(contextPath, httpHandler);
ContextPathCompositeHandler contextHandler = new ContextPathCompositeHandler(handlerMap);

if (embeddedNettyService == null) {
return super.getWebServer(contextHandler);
} else if (embeddedNettyService.getEmbedServer() == null) {
embeddedNettyService.setEmbedServer(initEmbedNetty());
}
HttpServer httpServer = (HttpServer) embeddedNettyService.getEmbedServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(contextHandler);
ArkNettyWebServer webServer = (ArkNettyWebServer) createNettyWebServer(httpServer,
handlerAdapter, lifecycleTimeout);
webServer.setRouteProviders(this.routeProviders);

return webServer;
}

public String getContextPath() {
String contextPath = "";
if (bizManagerService == null) {
return contextPath;
}
Biz biz = bizManagerService.getBizByClassLoader(Thread.currentThread()
.getContextClassLoader());

if (!StringUtils.isEmpty(contextPath)) {
return contextPath;
} else if (biz != null) {
if (StringUtils.isEmpty(biz.getWebContextPath())) {
return ROOT_WEB_CONTEXT_PATH;
}
return biz.getWebContextPath();
} else {
return ROOT_WEB_CONTEXT_PATH;
}
}

WebServer createNettyWebServer(HttpServer httpServer, ReactorHttpHandlerAdapter handlerAdapter,
Duration lifecycleTimeout) {
return new ArkNettyWebServer(httpServer, handlerAdapter, lifecycleTimeout);
}

private HttpServer initEmbedNetty(){
HttpServer server = HttpServer.create();
if (this.resourceFactory != null) {
LoopResources resources = this.resourceFactory.getLoopResources();
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
server = ((HttpServer)server.runOn(resources)).bindAddress(this::getListenAddress);
} else {
server = server.bindAddress(this::getListenAddress);
}

if (this.getSsl() != null && this.getSsl().isEnabled()) {
server = this.customizeSslConfiguration(server);
}


server = server.protocol(this.listProtocols()).forwarded(this.useForwardHeaders);
return applyCustomizers(server);
}

private HttpServer customizeSslConfiguration(HttpServer httpServer) {
SslServerCustomizer sslServerCustomizer = new SslServerCustomizer(this.getSsl(),
this.getHttp2(), this.getSslStoreProvider());
return sslServerCustomizer.apply(httpServer);
}

private HttpProtocol[] listProtocols() {
List<HttpProtocol> protocols = new ArrayList();
protocols.add(HttpProtocol.HTTP11);
if (this.getHttp2() != null && this.getHttp2().isEnabled()) {
if (this.getSsl() != null && this.getSsl().isEnabled()) {
protocols.add(HttpProtocol.H2);
} else {
protocols.add(HttpProtocol.H2C);
}
}

return (HttpProtocol[]) protocols.toArray(new HttpProtocol[0]);
}

private HttpServer applyCustomizers(HttpServer server) {
NettyServerCustomizer customizer;
for (Iterator var2 = this.serverCustomizers.iterator(); var2.hasNext(); server = (HttpServer) customizer
.apply(server)) {
customizer = (NettyServerCustomizer) var2.next();
}

return server;
}

private InetSocketAddress getListenAddress() {
return this.getAddress() != null ? new InetSocketAddress(
this.getAddress().getHostAddress(), this.getPort()) : new InetSocketAddress(
this.getPort());
}

}

0 comments on commit 362f9a8

Please sign in to comment.