Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions network/network_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,40 @@ func TestNetworkIntegration_TLSOffloading(t *testing.T) {
assert.EqualError(t, err, "rpc error: code = Unauthenticated desc = TLS client certificate authentication failed")
assert.Nil(t, msg)
})
t.Run("certificate revoked/denied", func(t *testing.T) {
testDirectory := io.TestDirectory(t)
// Start server node (node1)
node1 := startNode(t, "node1", testDirectory, func(serverCfg *core.ServerConfig, cfg *Config) {
serverCfg.TLS.Offload = core.OffloadIncomingTLS
serverCfg.TLS.ClientCertHeaderName = "client-cert"
})

// Load client cert and add it to the denylist
clientCertBytes, err := os.ReadFile(testCertAndKeyFile)
require.NoError(t, err)
cert, err := core.ParseCertificates(clientCertBytes)
require.NoError(t, err)
pki.SetNewDenylistWithCert(t, node1.network.pkiValidator, cert[0])

// Create client (node2) that connects to server node
grpcConn, err := grpcLib.Dial(nameToAddress(t, "node1"), grpcLib.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer grpcConn.Close()
ctx := context.Background()
outgoingMD := metadata.MD{}
outgoingMD.Set("peerID", "client")
outgoingMD.Set("nodeDID", "did:nuts:node2")
outgoingMD.Set("client-cert", url.QueryEscape(string(clientCertBytes)))
outgoingContext := metadata.NewOutgoingContext(ctx, outgoingMD)
client := v2.NewProtocolClient(grpcConn)
result, err := client.Stream(outgoingContext)
require.NoError(t, err)

// Assert connection is rejected
msg, err := result.Recv()
assert.EqualError(t, err, "rpc error: code = Unauthenticated desc = TLS client certificate validation failed")
assert.Nil(t, msg)
})
})
}

Expand Down
1 change: 1 addition & 0 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func TestNetwork_Configure(t *testing.T) {
ctx.protocol.EXPECT().Configure(gomock.Any())
ctx.pkiValidator.EXPECT().AddTruststore(gomock.Any())
ctx.pkiValidator.EXPECT().SetVerifyPeerCertificateFunc(gomock.Any()).Times(2) // tls.Configs: client, selfTestDialer
ctx.pkiValidator.EXPECT().SubscribeDenied(gomock.Any())
ctx.network.connectionManager = nil

cfg := *core.NewServerConfig()
Expand Down
3 changes: 3 additions & 0 deletions network/transport/grpc/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package grpc

import (
"crypto/x509"
"errors"
"fmt"
"github.com/nuts-foundation/go-did/did"
Expand Down Expand Up @@ -83,6 +84,7 @@ func (t tlsAuthenticator) Authenticate(nodeDID did.DID, grpcPeer grpcPeer.Peer,
Debug("Connection successfully authenticated")
peer.NodeDID = nodeDID
peer.Authenticated = true
peer.Certificate = peerCertificate
return peer, nil
}

Expand All @@ -96,5 +98,6 @@ type dummyAuthenticator struct{}
func (d dummyAuthenticator) Authenticate(nodeDID did.DID, _ grpcPeer.Peer, peer transport.Peer) (transport.Peer, error) {
peer.NodeDID = nodeDID
peer.Authenticated = true
peer.Certificate = &x509.Certificate{}
return peer, nil
}
6 changes: 6 additions & 0 deletions network/transport/grpc/authenticator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func Test_tlsAuthenticator_Authenticate(t *testing.T) {
expectedPeer := transport.Peer{
NodeDID: nodeDID,
Authenticated: true,
Certificate: cert,
}

t.Run("ok", func(t *testing.T) {
Expand Down Expand Up @@ -90,6 +91,11 @@ func Test_tlsAuthenticator_Authenticate(t *testing.T) {
},
},
}
expectedPeer := transport.Peer{
NodeDID: nodeDID,
Authenticated: true,
Certificate: wildcardCert,
}

authenticatedPeer, err := authenticator.Authenticate(nodeDID, grpcPeer, transport.Peer{})

Expand Down
35 changes: 13 additions & 22 deletions network/transport/grpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ type Connection interface {
// IsConnected returns whether the connection is active or not.
IsConnected() bool

// IsProtocolConnected returns whether the given protocol is active on the connection.
IsProtocolConnected(protocol Protocol) bool

// IsAuthenticated returns whether teh given connection is authenticated.
IsAuthenticated() bool

Expand Down Expand Up @@ -152,9 +149,8 @@ func (mc *conn) waitUntilDisconnected() {
mc.mux.RUnlock()
return
}
done := mc.ctx.Done()
mc.mux.RUnlock()
<-done
<-mc.ctx.Done()
}

func (mc *conn) verifyOrSetPeerID(id transport.PeerID) bool {
Expand Down Expand Up @@ -210,22 +206,26 @@ func (mc *conn) registerStream(protocol Protocol, stream Stream) bool {
mc.startSending(protocol, stream)

// A connection can have multiple active streams, but if one of them is closed, all of them should be closed, also closing the underlying connection.
go func(cancel func()) {
go func() {
<-stream.Context().Done()
cancel()
}(mc.cancelCtx)
mc.cancelCtx()
}()

return true
}

func (mc *conn) startReceiving(protocol Protocol, stream Stream) {
peer := mc.Peer() // copy Peer, because it will be nil when logging after disconnecting.
atomic.AddInt32(&mc.activeGoroutines, 1)
go func(activeGoroutines *int32, cancel func()) {
go func(activeGoroutines *int32) {
defer atomic.AddInt32(activeGoroutines, -1)
for {
message := protocol.CreateEnvelope()
err := stream.RecvMsg(message)
err := stream.RecvMsg(message) // blocking
if mc.ctx.Err() != nil {
// connection has been closed: drop message and stop receiving
return
}
if err != nil {
errStatus, isStatusError := status.FromError(err)
if errors.Is(err, io.EOF) || (isStatusError && errStatus.Code() == codes.Canceled) {
Expand All @@ -241,7 +241,7 @@ func (mc *conn) startReceiving(protocol Protocol, stream Stream) {
Warn("Peer connection error")
}
mc.status.Store(errStatus)
cancel()
mc.cancelCtx()
break
}

Expand All @@ -255,20 +255,19 @@ func (mc *conn) startReceiving(protocol Protocol, stream Stream) {
Warn("Error handling message")
}
}
}(&mc.activeGoroutines, mc.cancelCtx)
}(&mc.activeGoroutines)
}

func (mc *conn) startSending(protocol Protocol, stream Stream) {
outbox := mc.outboxes[protocol.MethodName()]
done := mc.ctx.Done()

atomic.AddInt32(&mc.activeGoroutines, 1)
go func(activeGoroutines *int32) {
defer atomic.AddInt32(activeGoroutines, -1)
loop:
for {
select {
case <-done:
case <-mc.ctx.Done():
break loop
case envelope := <-outbox:
if envelope == nil {
Expand Down Expand Up @@ -315,14 +314,6 @@ func (mc *conn) IsConnected() bool {
return len(mc.streams) > 0
}

func (mc *conn) IsProtocolConnected(protocol Protocol) bool {
mc.mux.RLock()
defer mc.mux.RUnlock()

_, ok := mc.streams[protocol.MethodName()]
return ok
}

func (mc *conn) IsAuthenticated() bool {
return mc.Peer().Authenticated
}
Expand Down
56 changes: 37 additions & 19 deletions network/transport/grpc/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package grpc

import (
"context"
"crypto/x509"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"github.com/nuts-foundation/go-did/did"
Expand All @@ -36,7 +38,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
grpcPeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -123,7 +124,10 @@ func NewGRPCConnectionManager(config Config, connectionStore stoabs.KVStore, nod
cm.addressBook = newAddressBook(connectionStore, config.backoffCreator)
cm.registerPrometheusMetrics()
cm.ctx, cm.ctxCancel = context.WithCancel(context.Background())

cm.lastCertificateValidation.Store(&time.Time{})
if config.tlsEnabled() {
config.pkiValidator.SubscribeDenied(cm.revalidatePeers)
}
return cm, nil
}

Expand All @@ -145,10 +149,11 @@ type grpcConnectionManager struct {
addressBook *addressBook

dialer
connectLoopWG sync.WaitGroup
dialOptions []grpc.DialOption
connectionTimeout time.Duration
connections *connectionList
connectLoopWG sync.WaitGroup
dialOptions []grpc.DialOption
connectionTimeout time.Duration
connections *connectionList
lastCertificateValidation atomic.Pointer[time.Time]
}

// newGrpcServer configures a new grpc.Server
Expand Down Expand Up @@ -183,18 +188,6 @@ func newGrpcServer(config Config) (*grpc.Server, error) {
serverInterceptors = append(serverInterceptors, ipInterceptor)
serverOpts = append(serverOpts, grpc.ChainStreamInterceptor(serverInterceptors...))

// Define the keepalive policy for the grpc server in such a way that connections are not long-lived.
// By blocking long-lived connections we ensure that connections are periodically reauthorized, namely
// so that a remote host which was authorized at the time of connection can become unauthorized and
// this is correctly enforced.
//
// Configured per https://github.com/grpc/grpc-go/blob/c9d3ea5673252d212c69f3d3c10ce1d7b287a86b/examples/features/keepalive/server/main.go#L43
keepaliveParams := keepalive.ServerParameters{
MaxConnectionAge: 15 * time.Minute, // If any connection is alive for too long, send a GOAWAY
MaxConnectionAgeGrace: 15 * time.Second, // Allow time for pending RPCs to complete before forcibly closing connections
}
serverOpts = append(serverOpts, grpc.KeepaliveParams(keepaliveParams))

// Create gRPC server for inbound connectionList and associate it with the protocols
return grpc.NewServer(serverOpts...), nil
}
Expand Down Expand Up @@ -401,7 +394,12 @@ func (s *grpcConnectionManager) Contacts() []transport.Contact {
}

func (s *grpcConnectionManager) Diagnostics() []core.DiagnosticResult {
return append(append([]core.DiagnosticResult{ownPeerIDStatistic{s.config.peerID}}, s.connections.Diagnostics()...))
return append(
[]core.DiagnosticResult{
lastCertificateValidationStatistic{*s.lastCertificateValidation.Load()},
ownPeerIDStatistic{s.config.peerID},
},
s.connections.Diagnostics()...)
}

// RegisterService implements grpc.ServiceRegistrar to register the gRPC services protocols expose.
Expand Down Expand Up @@ -544,6 +542,26 @@ func (s *grpcConnectionManager) authenticate(nodeDID did.DID, peer transport.Pee
return peer, nil
}

// revalidatePeers verifies for all peers the x509.Certificate provided during TLS handshake is still valid.
func (s *grpcConnectionManager) revalidatePeers() {
var err error
now := nowFunc()
s.lastCertificateValidation.Store(&now)
s.connections.forEach(func(conn Connection) {
peerCert := conn.Peer().Certificate
if nowFunc().After(peerCert.NotAfter) {
log.Logger().WithError(errors.New("certificate expired while in use")).WithFields(conn.Peer().ToFields()).Info("Disconnected peer")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why check this? This will be checked by the Golang's TLS when setting up the connection anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we're re-validating the peer's certificate on a connection that may last for a month? The only things that could have changed since it was first accepted is that it is banned or expired.

conn.disconnect()
return
}
err = s.config.pkiValidator.Validate([]*x509.Certificate{peerCert})
if err != nil {
log.Logger().WithError(err).WithFields(conn.Peer().ToFields()).Warn("Disconnected peer")
conn.disconnect()
}
})
}

func (s *grpcConnectionManager) handleInboundStream(protocol Protocol, inboundStream grpc.ServerStream) error {
peerFromCtx, _ := grpcPeer.FromContext(inboundStream.Context())
log.Logger().
Expand Down
Loading