Skip to content

Commit

Permalink
observability: implement server interceptor for logging (grpc#8992)
Browse files Browse the repository at this point in the history
  • Loading branch information
DNVindhya authored and temawi committed Apr 8, 2022
1 parent fd99e7c commit 3c843fb
Show file tree
Hide file tree
Showing 8 changed files with 650 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static synchronized void grpcInit() {
// TODO(dnvindhya): PROJECT_ID to be replaced with configured destinationProjectId
Sink sink = new GcpLogSink(PROJECT_ID);
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(sink));
LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl());
LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(sink));
// TODO(sanjaypujare): initialize customTags map
initialized = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,223 @@

package io.grpc.observability.interceptors;

import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import io.grpc.Internal;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.net.SocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/** A logging interceptor for {@code LoggingServerProvider}. */
@Internal
public final class InternalLoggingServerInterceptor implements ServerInterceptor {
private static final Logger logger = Logger
.getLogger(InternalLoggingServerInterceptor.class.getName());

private final LogHelper helper;

public interface Factory {
ServerInterceptor create();
}

public static class FactoryImpl implements Factory {
private final Sink sink;
private final LogHelper helper;

static LogHelper createLogHelper(Sink sink, TimeProvider provider) {
return new LogHelper(sink, provider);
}

public FactoryImpl(Sink sink) {
this.sink = sink;
this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
}

@Override
public ServerInterceptor create() {
return new InternalLoggingServerInterceptor();
return new InternalLoggingServerInterceptor(helper);
}

/**
* Closes the sink instance.
*/
public void close() {
if (sink != null) {
sink.close();
}
}
}

private InternalLoggingServerInterceptor(LogHelper helper) {
this.helper = helper;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
// TODO(dnvindhya) implement the interceptor
return null;
final AtomicLong seq = new AtomicLong(1);
final String rpcId = UUID.randomUUID().toString();
final String authority = call.getAuthority();
final String serviceName = call.getMethodDescriptor().getServiceName();
final String methodName = call.getMethodDescriptor().getBareMethodName();
final SocketAddress peerAddress = LogHelper.getPeerAddress(call.getAttributes());
Deadline deadline = Context.current().getDeadline();
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));

// TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged
// according to config. Until then always return true.
if (!helper.isMethodToBeLogged(call.getMethodDescriptor().getFullMethodName())) {
return next.startCall(call, headers);
}

// Event: EventType.GRPC_CALL_REQUEST_HEADER
try {
helper.logRequestHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}

ServerCall<ReqT, RespT> wrapperCall =
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
try {
helper.logResponseHeader(
seq.getAndIncrement(),
serviceName,
methodName,
headers,
EventLogger.LOGGER_SERVER,
rpcId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
super.sendHeaders(headers);
}

@Override
public void sendMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
super.sendMessage(message);
}

@Override
public void close(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
status,
trailers,
EventLogger.LOGGER_SERVER,
rpcId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
super.close(status, trailers);
}
};

ServerCall.Listener<ReqT> listener = next.startCall(wrapperCall, headers);
return new SimpleForwardingServerCallListener<ReqT>(listener) {
@Override
public void onMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
super.onMessage(message);
}

@Override
public void onHalfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
super.onHalfClose();
}

@Override
public void onCancel() {
// Event: EventType.GRPC_CALL_CANCEL
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
super.onCancel();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel;
import java.net.Inet4Address;
Expand Down Expand Up @@ -65,8 +66,7 @@ class LogHelper {
}

/**
* Logs the request header.
* Binary logging equivalent of logClientHeader.
* Logs the request header. Binary logging equivalent of logClientHeader.
*/
void logRequestHeader(
long seqId,
Expand Down Expand Up @@ -95,7 +95,7 @@ void logRequestHeader(
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.proto)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setRpcId(rpcId);
if (timeout != null) {
Expand All @@ -108,8 +108,7 @@ void logRequestHeader(
}

/**
* Logs the reponse header.
* Binary logging equivalent of logServerHeader.
* Logs the reponse header. Binary logging equivalent of logServerHeader.
*/
void logResponseHeader(
long seqId,
Expand All @@ -136,7 +135,7 @@ void logResponseHeader(
.setEventType(EventType.GRPC_CALL_RESPONSE_HEADER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.proto)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setRpcId(rpcId);
if (peerAddress != null) {
Expand Down Expand Up @@ -173,7 +172,7 @@ void logTrailer(
.setEventType(EventType.GRPC_CALL_TRAILER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.proto)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setStatusCode(status.getCode().value())
.setRpcId(rpcId);
Expand All @@ -198,9 +197,9 @@ <T> void logRpcMessage(
long seqId,
String serviceName,
String methodName,
GrpcLogRecord.EventType eventType,
EventType eventType,
T message,
GrpcLogRecord.EventLogger eventLogger,
EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
Expand All @@ -211,11 +210,23 @@ <T> void logRpcMessage(
"event type must correspond to client message or server message");
checkNotNull(message, "message");

// TODO(dnvindhya): Convert message to bystestring and also log the message
// byte[] messageArray = (byte[])message;
// int messageLength = messageArray.length;
// ByteString messageData =
// ByteString.copyFrom((byte[]) message, 0, ((byte[]) message).length);
// TODO(dnvindhya): Implement conversion of generics to ByteString
// Following is a temporary workaround to log if message is of following types :
// 1. com.google.protobuf.Message
// 2. byte[]
byte[] messageBytesArray = null;
if (message instanceof com.google.protobuf.Message) {
messageBytesArray = ((com.google.protobuf.Message)message).toByteArray();
} else if (message instanceof byte[]) {
messageBytesArray = (byte[]) message;
} else {
logger.log(Level.WARNING, "message is of UNKNOWN type, message and payload_size fields"
+ "of GrpcLogRecord proto will not be logged");
}
PayloadBuilder<ByteString> pair = null;
if (messageBytesArray != null) {
pair = createMesageProto(messageBytesArray);
}

GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
Expand All @@ -225,6 +236,12 @@ <T> void logRpcMessage(
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
if (pair != null && pair.size != 0) {
logEntryBuilder.setPayloadSize(pair.size);
}
if (pair != null && pair.payload != null) {
logEntryBuilder.setMessage(pair.payload);
}
sink.write(logEntryBuilder.build());
}

Expand Down Expand Up @@ -282,11 +299,11 @@ GrpcLogRecord.Builder createTimestamp() {
}

static final class PayloadBuilder<T> {
T proto;
T payload;
int size;

private PayloadBuilder(T proto, int size) {
this.proto = proto;
private PayloadBuilder(T payload, int size) {
this.payload = payload;
this.size = size;
}
}
Expand Down Expand Up @@ -315,6 +332,13 @@ static PayloadBuilder<GrpcLogRecord.Metadata.Builder> createMetadataProto(Metada
return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes);
}

static PayloadBuilder<ByteString> createMesageProto(byte[] message) {
int messageLength = message.length;
ByteString messageData =
ByteString.copyFrom(message, 0, messageLength);
return new PayloadBuilder<ByteString>(messageData, messageLength);
}

static Address socketAddressToProto(SocketAddress address) {
checkNotNull(address, "address");
Address.Builder builder = Address.newBuilder();
Expand Down Expand Up @@ -343,7 +367,7 @@ static Address socketAddressToProto(SocketAddress address) {
}

/**
* Retrieves socket address.
* Retrieves socket address.
*/
static SocketAddress getPeerAddress(Attributes streamAttributes) {
return streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
Expand All @@ -367,5 +391,4 @@ static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1)
boolean isMethodToBeLogged(String fullMethodName) {
return true;
}

}
Loading

0 comments on commit 3c843fb

Please sign in to comment.