Skip to content

Commit

Permalink
The event socket now integrated into tcp-info (#107)
Browse files Browse the repository at this point in the history
The event socket now integrated into tcp-info with a basic client.
  • Loading branch information
pboothe committed Oct 21, 2019
1 parent ebe46a0 commit 2651da6
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 54 deletions.
67 changes: 67 additions & 0 deletions eventsocket/client.go
@@ -0,0 +1,67 @@
package eventsocket

import (
"bufio"
"context"
"encoding/json"
"log"
"net"
"strings"
"time"

"github.com/m-lab/go/rtx"
"github.com/m-lab/tcp-info/inetdiag"
)

// Handler is the interface that all interested users of the event socket
// notifications should implement. It has two methods, one called on Open events
// and one called on Close events.
type Handler interface {
Open(timestamp time.Time, uuid string, ID *inetdiag.SockID)
Close(timestamp time.Time, uuid string)
}

// MustRun will read from the passed-in socket filename until the context is
// cancelled. Any errors are fatal.
func MustRun(ctx context.Context, socket string, handler Handler) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, err := net.Dial("unix", socket)
rtx.Must(err, "Could not connect to %q", socket)
go func() {
// Close the connection when the context is done. Closing the underlying
// connection means that the scanner will soon terminate.
<-ctx.Done()
c.Close()
}()

// By default bufio.Scanner is based on newlines, which is perfect for our JSONL protocol.
s := bufio.NewScanner(c)
for s.Scan() {
var event FlowEvent
rtx.Must(json.Unmarshal(s.Bytes(), &event), "Could not unmarshall")
switch event.Event {
case Open:
handler.Open(event.Timestamp, event.UUID, event.ID)
case Close:
handler.Close(event.Timestamp, event.UUID)
default:
log.Println("Unknown event type:", event.Event)
}
}

// s.Err() is supposed to be nil under normal conditions. Scanner objects
// hide the expected EOF error and return nil after they encounter it,
// because EOF is the expected error. However, reading on a closed socket
// doesn't give you an EOF error and the error it does give you is
// unexported. The error it gives you should be treated the same as EOF,
// because it corresponds to the connection terminating under normal
// conditions. Because Scanner hides the EOF error, it should also hide the
// unexported one. Because Scanner doesn't, we do so here. Other errors
// should not be hidden.
err = s.Err()
if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
err = nil
}
rtx.Must(err, "Scanning of %f died with non-EOF error", socket)
}
67 changes: 67 additions & 0 deletions eventsocket/client_test.go
@@ -0,0 +1,67 @@
package eventsocket

import (
"context"
"io/ioutil"
"os"
"sync"
"testing"
"time"

"github.com/m-lab/go/rtx"
"github.com/m-lab/tcp-info/inetdiag"
)

type testHandler struct {
opens, closes int
wg sync.WaitGroup
}

func (t *testHandler) Open(timestamp time.Time, uuid string, id *inetdiag.SockID) {
t.opens++
t.wg.Done()
}

func (t *testHandler) Close(timestamp time.Time, uuid string) {
t.closes++
t.wg.Done()
}

func TestClient(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dir, err := ioutil.TempDir("", "TestEventSocketClient")
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

srv := New(dir + "/tcpevents.sock").(*server)
srv.Listen()
srvCtx, srvCancel := context.WithCancel(context.Background())
go srv.Serve(srvCtx)
defer srvCancel()

th := &testHandler{}
clientWg := sync.WaitGroup{}
clientWg.Add(1)
go func() {
MustRun(ctx, dir+"/tcpevents.sock", th)
clientWg.Done()
}()
th.wg.Add(2)

// Send an open event
srv.FlowCreated(time.Now(), "fakeuuid", inetdiag.SockID{})
// Send a bad event and make sure nothing crashes.
srv.eventC <- &FlowEvent{
Event: TCPEvent(1000),
Timestamp: time.Now(),
UUID: "fakeuuid",
}
// Send a deletion event
srv.FlowDeleted(time.Now(), "fakeuuid")
th.wg.Wait() // Wait until the handler gets two events!

// Cancel the context and wait until the client stops running.
cancel()
clientWg.Wait()
}
75 changes: 47 additions & 28 deletions eventsocket/eventsocket.go → eventsocket/server.go
Expand Up @@ -8,6 +8,8 @@ import (
"net"
"sync"
"time"

"github.com/m-lab/tcp-info/inetdiag"
)

//go:generate stringer -type=TCPEvent
Expand All @@ -25,21 +27,26 @@ const (
)

// FlowEvent is the data that is sent down the socket in JSONL form to the
// clients. The UUID, Timestamp, and Event fields are required, all other fields
// are optional.
// clients. The UUID, Timestamp, and Event fields will always be filled in, all
// other fields are optional.
type FlowEvent struct {
Event TCPEvent
Timestamp time.Time
Src, Dest string `json:",omitempty"`
SPort, DPort uint16 `json:",omitempty"`
UUID string
Event TCPEvent
Timestamp time.Time
UUID string
ID *inetdiag.SockID //`json:",omitempty"`
}

// Server is the interface that has the methods that actually serve the events
// over the unix domain socket. You should make new Server objects with
// eventsocket.New or eventsocket.NullServer.
type Server interface {
Listen() error
Serve(context.Context) error
FlowCreated(timestamp time.Time, uuid string, sockid inetdiag.SockID)
FlowDeleted(timestamp time.Time, uuid string)
}

// Server is the struct that has the methods that actually serve the events over
// the unix domain socket. You should make new Server objects with
// eventsocket.New unless you really know what you are doing (e.g. you are
// writing unit tests).
type Server struct {
type server struct {
eventC chan *FlowEvent
filename string
clients map[net.Conn]struct{}
Expand All @@ -48,14 +55,14 @@ type Server struct {
servingWG sync.WaitGroup
}

func (s *Server) addClient(c net.Conn) {
func (s *server) addClient(c net.Conn) {
log.Println("Adding new TCP event client", c)
s.mutex.Lock()
defer s.mutex.Unlock()
s.clients[c] = struct{}{}
}

func (s *Server) removeClient(c net.Conn) {
func (s *server) removeClient(c net.Conn) {
s.servingWG.Add(1)
defer s.servingWG.Done()
s.mutex.Lock()
Expand All @@ -68,7 +75,7 @@ func (s *Server) removeClient(c net.Conn) {
delete(s.clients, c)
}

func (s *Server) sendToAllListeners(data string) {
func (s *server) sendToAllListeners(data string) {
s.mutex.Lock()
defer s.mutex.Unlock()
for c := range s.clients {
Expand All @@ -85,7 +92,7 @@ func (s *Server) sendToAllListeners(data string) {
}
}

func (s *Server) notifyClients(ctx context.Context) {
func (s *server) notifyClients(ctx context.Context) {
s.servingWG.Add(1)
defer s.servingWG.Done()
for ctx.Err() == nil {
Expand All @@ -107,7 +114,7 @@ func (s *Server) notifyClients(ctx context.Context) {
// server will not immediately fail. In order for them to succeed, Serve()
// should be called. This function should only be called once for a given
// Server.
func (s *Server) Listen() error {
func (s *server) Listen() error {
// Add to the waitgroup inside Listen(), subtract from it in Serve(). That way,
// even if the Serve() goroutine is scheduled weirdly, servingWG.Wait() will
// definitely wait for Serve() to finish.
Expand All @@ -120,7 +127,7 @@ func (s *Server) Listen() error {
// Serve all clients that connect to this server until the context is canceled.
// It is expected that this will be called in a goroutine, after Listen has been
// called. This function should only be called once for a given server.
func (s *Server) Serve(ctx context.Context) error {
func (s *server) Serve(ctx context.Context) error {
defer s.servingWG.Done()
derivedCtx, derivedCancel := context.WithCancel(ctx)
defer derivedCancel()
Expand Down Expand Up @@ -153,33 +160,45 @@ func (s *Server) Serve(ctx context.Context) error {
}

// FlowCreated should be called whenever tcpinfo notices a new flow is created.
func (s *Server) FlowCreated(src, dest string, sport, dport uint16, uuid string) {
func (s *server) FlowCreated(timestamp time.Time, uuid string, id inetdiag.SockID) {
s.eventC <- &FlowEvent{
Event: Open,
Timestamp: time.Now(),
Src: src,
Dest: dest,
SPort: sport,
DPort: dport,
Timestamp: timestamp,
ID: &id,
UUID: uuid,
}
}

// FlowDeleted should be called whenever tcpinfo notices a flow has been retired.
func (s *Server) FlowDeleted(uuid string) {
func (s *server) FlowDeleted(timestamp time.Time, uuid string) {
s.eventC <- &FlowEvent{
Event: Close,
Timestamp: time.Now(),
Timestamp: timestamp,
UUID: uuid,
}
}

// New makes a new server that serves clients on the provided Unix domain socket.
func New(filename string) *Server {
func New(filename string) Server {
c := make(chan *FlowEvent, 100)
return &Server{
return &server{
filename: filename,
eventC: c,
clients: make(map[net.Conn]struct{}),
}
}

type nullServer struct{}

// Empty implementations that do no harm.
func (nullServer) Listen() error { return nil }
func (nullServer) Serve(context.Context) error { return nil }
func (nullServer) FlowCreated(timestamp time.Time, uuid string, id inetdiag.SockID) {}
func (nullServer) FlowDeleted(timestamp time.Time, uuid string) {}

// NullServer returns a Server that does nothing. It is made so that code that
// may or may not want to use a eventsocket can receive a Server interface and
// not have to worry about whether it is nil.
func NullServer() Server {
return nullServer{}
}
24 changes: 19 additions & 5 deletions eventsocket/eventsocket_test.go → eventsocket/server_test.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-test/deep"

"github.com/m-lab/go/rtx"
"github.com/m-lab/tcp-info/inetdiag"
)

func TestServer(t *testing.T) {
Expand All @@ -23,7 +24,7 @@ func TestServer(t *testing.T) {
rtx.Must(err, "Could not create tempdir")
defer os.RemoveAll(dir)

srv := New(dir + "/tcpevents.sock")
srv := New(dir + "/tcpevents.sock").(*server)
srv.Listen()
go srv.Serve(ctx)
log.Println("About to dial")
Expand All @@ -41,7 +42,7 @@ func TestServer(t *testing.T) {
}

// Send an event on the server, to cause the client to be notified by the server.
srv.FlowDeleted("fakeuuid")
srv.FlowDeleted(time.Now(), "fakeuuid")
r := bufio.NewScanner(c)
if !r.Scan() {
t.Error("Should have been able to scan until the next newline, but couldn't")
Expand All @@ -54,7 +55,8 @@ func TestServer(t *testing.T) {

// Send another event on the server, to cause the client to be notified by the server.
before := time.Now()
srv.FlowCreated("src", "dst", 1, 2, "fakeuuid2")
emptyID := inetdiag.SockID{}
srv.FlowCreated(time.Now(), "fakeuuid2", emptyID)
if !r.Scan() {
t.Error("Should have been able to scan until the next newline, but couldn't")
}
Expand All @@ -64,7 +66,7 @@ func TestServer(t *testing.T) {
t.Error("It should be true that", before, "<", event.Timestamp, "<", after)
}
event.Timestamp = time.Time{}
if diff := deep.Equal(event, FlowEvent{Open, time.Time{}, "src", "dst", 1, 2, "fakeuuid2"}); diff != nil {
if diff := deep.Equal(event, FlowEvent{Open, time.Time{}, "fakeuuid2", &emptyID}); diff != nil {
t.Error("Event differed from expected:", diff)
}

Expand All @@ -79,7 +81,7 @@ func TestServer(t *testing.T) {
// No SIGSEGV == success!

// Send an event to ensure that cleanup should occur.
srv.FlowDeleted("fakeuuid")
srv.FlowDeleted(time.Now(), "fakeuuid")

// Busy wait until the server has unregistered the client
for {
Expand Down Expand Up @@ -114,3 +116,15 @@ func TestTCPEvent_String(t *testing.T) {
})
}
}

func TestNullServer(t *testing.T) {
// Verify that the null server never crashes or returns a non-null error
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
srv := NullServer()
rtx.Must(srv.Listen(), "Could not listen")
rtx.Must(srv.Serve(ctx), "Could not serve")
srv.FlowCreated(time.Now(), "", inetdiag.SockID{})
srv.FlowDeleted(time.Now(), "")
// No crash == success
}

0 comments on commit 2651da6

Please sign in to comment.