Skip to content

Commit

Permalink
[#4558] Refactor grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Mar 13, 2019
1 parent 3b5c4af commit a4f3dd9
Show file tree
Hide file tree
Showing 22 changed files with 473 additions and 299 deletions.
Expand Up @@ -24,16 +24,16 @@
import com.navercorp.pinpoint.collector.service.async.AgentLifeCycleAsyncTaskService;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.grpc.server.DefaultServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.IdGeneratorServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.InetAddressFilter;
import com.navercorp.pinpoint.grpc.server.PermissionServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.ServerFactory;
import com.navercorp.pinpoint.grpc.server.ServerOption;
import com.navercorp.pinpoint.grpc.server.TransportMetadataFactory;
import com.navercorp.pinpoint.grpc.server.TransportMetadataServerInterceptor;
import com.navercorp.pinpoint.rpc.server.handler.ServerStateChangeEventHandler;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
Expand All @@ -42,7 +42,6 @@
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Resource;
import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand All @@ -56,12 +55,15 @@ public class AgentServer implements InitializingBean, DisposableBean, BeanNameAw
private String beanName;
private boolean enable = true;

private String bindIp;
private int bindPort;

private ExecutorService executor;
private AddressFilter addressFilter;
private DispatchHandler dispatchHandler;
private ServerOption serverOption;

private ServerFactory serverFactory;

private Server server;

@Autowired
Expand All @@ -70,7 +72,9 @@ public class AgentServer implements InitializingBean, DisposableBean, BeanNameAw
@Autowired
private AgentLifeCycleAsyncTaskService agentLifeCycleAsyncTask;

// TODO
private DispatchHandler dispatchHandler;


@Resource(name = "channelStateChangeEventHandlers")
private List<ServerStateChangeEventHandler> channelStateChangeEventHandlers = Collections.emptyList();
private ZookeeperClusterService clusterService;
Expand All @@ -81,20 +85,27 @@ public void afterPropertiesSet() throws Exception {
}

Assert.requireNonNull(this.beanName, "beanName must not be null");
Assert.requireNonNull(this.bindIp, "bindIp must not be null");
Assert.requireNonNull(this.dispatchHandler, "dispatchHandler must not be null");
Assert.requireNonNull(this.addressFilter, "addressFilter must not be null");

final ServerFactory serverFactory = new ServerFactory(this.beanName, this.bindPort, this.executor, this.serverOption);
serverFactory.addService(new AgentService(this.dispatchHandler));
serverFactory.addService(new KeepAliveService(this.agentEventAsyncTask, this.agentLifeCycleAsyncTask));
serverFactory.addTransportFilter(new DefaultServerTransportFilter());
serverFactory.addTransportFilter(new IdGeneratorServerTransportFilter());
serverFactory.addTransportFilter(new PermissionServerTransportFilter(new InetAddressFilter() {
@Override
public boolean accept(InetAddress inetAddress) {
return addressFilter.accept(inetAddress);
}
}));
this.serverFactory = new ServerFactory(beanName, this.bindIp, this.bindPort, executor);
ServerTransportFilter permissionServerTransportFilter = new PermissionServerTransportFilter(addressFilter);
this.serverFactory.addTransportFilter(permissionServerTransportFilter);

TransportMetadataFactory transportMetadataFactory = new TransportMetadataFactory();
final ServerTransportFilter metadataTransportFilter = new MetadataServerTransportFilter(transportMetadataFactory);
this.serverFactory.addTransportFilter(metadataTransportFilter);

ServerInterceptor transportMetadataServerInterceptor = new TransportMetadataServerInterceptor();
this.serverFactory.addInterceptor(transportMetadataServerInterceptor);

// Add service
BindableService agentService = new AgentService(dispatchHandler);
this.serverFactory.addService(agentService);

KeepAliveService keepAliveService = new KeepAliveService(agentEventAsyncTask, agentLifeCycleAsyncTask);
serverFactory.addService(keepAliveService);

this.server = serverFactory.build();
if (logger.isInfoEnabled()) {
Expand All @@ -112,6 +123,9 @@ public void destroy() throws Exception {
if (this.server != null) {
this.server.shutdown();
}
if (this.serverFactory != null) {
this.serverFactory.close();
}
}

// Test only
Expand Down
Expand Up @@ -20,6 +20,8 @@
import com.navercorp.pinpoint.io.request.ServerResponse;
import io.grpc.stub.StreamObserver;

import java.util.Objects;

/**
* @author jaehong.kim
*/
Expand All @@ -28,7 +30,7 @@ public class GrpcServerResponse implements ServerResponse<PResult> {
private final StreamObserver<PResult> responseObserver;

public GrpcServerResponse(StreamObserver<PResult> responseObserver) {
this.responseObserver = responseObserver;
this.responseObserver = Objects.requireNonNull(responseObserver, "responseObserver must not be null");
}

@Override
Expand Down
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -14,8 +14,9 @@
* limitations under the License.
*/

package com.navercorp.pinpoint.grpc.server;
package com.navercorp.pinpoint.collector.receiver.grpc;

import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.common.util.Assert;
import io.grpc.Attributes;
import io.grpc.Grpc;
Expand All @@ -33,10 +34,10 @@
public class PermissionServerTransportFilter extends ServerTransportFilter {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

private final InetAddressFilter inetAddressFilter;
private final AddressFilter addressFilter;

public PermissionServerTransportFilter(final InetAddressFilter inetAddressFilter) {
this.inetAddressFilter = Assert.requireNonNull(inetAddressFilter, "addressFilter must not be null");
public PermissionServerTransportFilter(final AddressFilter addressFilter) {
this.addressFilter = Assert.requireNonNull(addressFilter, "addressFilter must not be null");
}

@Override
Expand All @@ -49,17 +50,17 @@ public Attributes transportReady(final Attributes attributes) {
if (remoteSocketAddress == null) {
// Unauthenticated
logger.debug("Unauthenticated transport. TRANSPORT_ATTR_REMOTE_ADDR must not be null");
throw Status.UNAUTHENTICATED.asRuntimeException();
throw Status.INTERNAL.withDescription("RemoteAddress is null").asRuntimeException();
}

final InetAddress inetAddress = remoteSocketAddress.getAddress();
if (!inetAddressFilter.accept(inetAddress)) {
// Permission denied
logger.debug("Permission denied transport.");
throw Status.PERMISSION_DENIED.asRuntimeException();
if (addressFilter.accept(inetAddress)) {
return attributes;
}

return attributes;
// Permission denied
logger.debug("Permission denied transport.");
throw Status.PERMISSION_DENIED.asRuntimeException();
}

@Override
Expand Down
Expand Up @@ -20,49 +20,61 @@
import com.navercorp.pinpoint.collector.receiver.grpc.service.TraceService;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.grpc.server.DefaultServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.ServerFactory;
import io.grpc.BindableService;
import com.navercorp.pinpoint.grpc.server.ServerOption;
import io.grpc.Server;
import io.grpc.ServerTransportFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;

/**
* @author jaehong.kim
*/
public class SpanServer implements InitializingBean, DisposableBean, BeanNameAware {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
// Bean property

private String beanName;
private boolean enable;

private String bindIp;
private int bindPort;
private ExecutorService executor;
private AddressFilter addressFilter;

private ServerFactory serverFactory;
private Executor executor;

private DispatchHandler dispatchHandler;
private AddressFilter addressFilter;
private ServerOption serverOption;

private Server server;


@Override
public void afterPropertiesSet() throws Exception {
if (Boolean.FALSE == this.enable) {
return;
}

Assert.requireNonNull(this.beanName, "beanName must not be null");
Assert.requireNonNull(this.bindIp, "bindIp must not be null");
Assert.requireNonNull(this.dispatchHandler, "dispatchHandler must not be null");
Assert.requireNonNull(this.addressFilter, "addressFilter must not be null");

final ServerFactory serverFactory = new ServerFactory(this.beanName, this.bindPort, this.executor, this.serverOption);
serverFactory.addService(new TraceService(this.dispatchHandler));
serverFactory.addTransportFilter(new DefaultServerTransportFilter());

this.serverFactory = new ServerFactory(beanName, this.bindIp, this.bindPort, this.executor);
ServerTransportFilter permissionServerTransportFilter = new PermissionServerTransportFilter(addressFilter);
this.serverFactory.addTransportFilter(permissionServerTransportFilter);

// Add service
BindableService traceService = new TraceService(this.dispatchHandler);
this.serverFactory.addService(traceService);

this.server = serverFactory.build();
if (logger.isInfoEnabled()) {
logger.info("Start span server {}", this.server);
Expand All @@ -79,6 +91,9 @@ public void destroy() throws Exception {
if (this.server != null) {
this.server.shutdown();
}
if (this.serverFactory != null) {
this.serverFactory.close();
}
}

// Test only
Expand Down Expand Up @@ -113,7 +128,7 @@ public void setAddressFilter(AddressFilter addressFilter) {
this.addressFilter = addressFilter;
}

public void setExecutor(ExecutorService executor) {
public void setExecutor(Executor executor) {
this.executor = executor;
}

Expand Down
Expand Up @@ -20,21 +20,22 @@
import com.navercorp.pinpoint.collector.receiver.grpc.service.StatService;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.grpc.server.DefaultServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.ServerFactory;
import com.navercorp.pinpoint.grpc.server.TransportMetadataFactory;
import io.grpc.BindableService;
import com.navercorp.pinpoint.grpc.server.ServerOption;
import io.grpc.Server;
import io.grpc.ServerTransportFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import java.util.concurrent.ExecutorService;
import java.util.Objects;
import java.util.concurrent.Executor;

/**
* @author jaehong.kim
*/
public class StatServer implements InitializingBean, DisposableBean, BeanNameAware {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

Expand All @@ -44,9 +45,11 @@ public class StatServer implements InitializingBean, DisposableBean, BeanNameAwa
private String bindIp;
private int bindPort;

private ExecutorService executor;
private AddressFilter addressFilter;
private ServerFactory serverFactory;
private Executor executor;

private DispatchHandler dispatchHandler;
private AddressFilter addressFilter;
private ServerOption serverOption;

private Server server;
Expand All @@ -58,12 +61,21 @@ public void afterPropertiesSet() throws Exception {
}

Assert.requireNonNull(this.beanName, "beanName must not be null");
Assert.requireNonNull(this.bindIp, "bindIp must not be null");
Assert.requireNonNull(this.dispatchHandler, "dispatchHandler must not be null");
Assert.requireNonNull(this.addressFilter, "addressFilter must not be null");

final ServerFactory serverFactory = new ServerFactory(this.beanName, this.bindPort, this.executor, this.serverOption);
serverFactory.addTransportFilter(new DefaultServerTransportFilter());
serverFactory.addService(new StatService(this.dispatchHandler));

this.serverFactory = new ServerFactory(beanName, this.bindIp, this.bindPort, this.executor);
ServerTransportFilter permissionServerTransportFilter = new PermissionServerTransportFilter(addressFilter);
this.serverFactory.addTransportFilter(permissionServerTransportFilter);

// Add options

// Add service
BindableService statService = new StatService(this.dispatchHandler);
serverFactory.addService(statService);

this.server = serverFactory.build();
if (logger.isInfoEnabled()) {
logger.info("Start stat server {}", this.server);
Expand All @@ -80,6 +92,9 @@ public void destroy() throws Exception {
if (this.server != null) {
this.server.shutdown();
}
if (this.serverFactory != null) {
this.serverFactory.close();
}
}

// Test only
Expand Down Expand Up @@ -114,7 +129,7 @@ public void setAddressFilter(AddressFilter addressFilter) {
this.addressFilter = addressFilter;
}

public void setExecutor(ExecutorService executor) {
public void setExecutor(Executor executor) {
this.executor = executor;
}

Expand Down

0 comments on commit a4f3dd9

Please sign in to comment.