Skip to content

Commit

Permalink
Prototype to trace incoming and outgoing connections from containes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Wilkie committed May 19, 2015
1 parent 6b5cc79 commit 96b4687
Show file tree
Hide file tree
Showing 11 changed files with 930 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ experimental/graphviz/graphviz
experimental/oneshot/oneshot
experimental/example/qotd/qotd
experimental/example/goapp/app
experimental/tracer/tracer
*sublime-project
*sublime-workspace
*npm-debug.log
Expand Down
5 changes: 5 additions & 0 deletions experimental/tracer/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

tracer: *.go
go get .
go build .

9 changes: 9 additions & 0 deletions experimental/tracer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Run tracer:
- make
- ./tracer.sh start

TODO:
- need to stich traces together
- deal with persistant connections
- make it work for goroutines
- test with jvm based app
165 changes: 165 additions & 0 deletions experimental/tracer/fd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"bufio"
"encoding/hex"
"fmt"
"net"
"os"
"regexp"
"strconv"
"time"
)

const (
LISTENING = iota
INCOMING
OUTGOING
)

const (
SOCKET_REGEX = `^socket:\[(\d+)\]$`
TCP_REGEX = `^\s*(?P<fd>\d+): (?P<localaddr>[A-F0-9]{8}):(?P<localport>[A-F0-9]{4}) ` +
`(?P<remoteaddr>[A-F0-9]{8}):(?P<remoteport>[A-F0-9]{4}) (?:[A-F0-9]{2}) (?:[A-F0-9]{8}):(?:[A-F0-9]{8}) ` +
`(?:[A-F0-9]{2}):(?:[A-F0-9]{8}) (?:[A-F0-9]{8}) \s+(?:\d+) \s+(?:\d+) (?P<inode>\d+)`
)

var (
socketRegex = regexp.MustCompile(SOCKET_REGEX)
tcpRegexp = regexp.MustCompile(TCP_REGEX)
)

type Fd struct {
direction int
fd int

start int64
stop int64
sent int64
received int64

localAddr net.IP
localPort uint16
remoteAddr net.IP
remotePort uint16

// Fds are connections, and can have a causal-link to other Fds
cause *Fd
effect []*Fd
}

func getLocalAddr(pid, fd int) (addr net.IP, port uint16, err error) {
var (
socket string
match []string
inode int
tcpFile *os.File
scanner *bufio.Scanner
candidate int
port64 int64
)

socket, err = os.Readlink(fmt.Sprintf("/proc/%d/fd/%d", pid, fd))
if err != nil {
return
}

match = socketRegex.FindStringSubmatch(socket)
if match == nil {
err = fmt.Errorf("Fd %d not a socket", fd)
return
}

inode, err = strconv.Atoi(match[1])
if err != nil {
return
}

tcpFile, err = os.Open(fmt.Sprintf("/proc/%d/net/tcp", pid))
if err != nil {
return
}
defer tcpFile.Close()

scanner = bufio.NewScanner(tcpFile)
for scanner.Scan() {
match = tcpRegexp.FindStringSubmatch(scanner.Text())
if match == nil {
continue
}

candidate, err = strconv.Atoi(match[6])
if err != nil {
return
}
if candidate != inode {
continue
}

addr = make([]byte, 4)
if _, err = hex.Decode(addr, []byte(match[2])); err != nil {
return
}
addr[0], addr[1], addr[2], addr[3] = addr[3], addr[2], addr[1], addr[0]

// use a 32 bit int for target, at the result is a uint16
port64, err = strconv.ParseInt(match[3], 16, 32)
if err != nil {
return
}
port = uint16(port64)

return
}

if err = scanner.Err(); err != nil {
return
}

err = fmt.Errorf("Fd %d not found for proc %d", fd, pid)
return
}

// We want to get the listening address from /proc
func NewListeningFd(pid, fd int) (*Fd, error) {
localAddr, localPort, err := getLocalAddr(pid, fd)
if err != nil {
return nil, err
}

return &Fd{
direction: LISTENING, fd: fd, start: time.Now().Unix(),
localAddr: localAddr, localPort: uint16(localPort),
}, nil
}

// We intercepted a connect syscall
func NewConnectionFd(pid, fd int, remoteAddr net.IP, remotePort uint16) (*Fd, error) {
localAddr, localPort, err := getLocalAddr(pid, fd)
if err != nil {
return nil, err
}

return &Fd{
direction: OUTGOING, fd: fd, start: time.Now().Unix(),
localAddr: localAddr, localPort: uint16(localPort),
remoteAddr: remoteAddr, remotePort: remotePort,
}, nil
}

// We got a new connection on a listening socket
func (fd *Fd) NewConnection(addr net.IP, port uint16, newFd int) (*Fd, error) {
if fd.direction != LISTENING {
return nil, fmt.Errorf("New connection on non-listening fd!")
}

return &Fd{
direction: INCOMING, fd: newFd, start: time.Now().Unix(),
localAddr: fd.localAddr, localPort: fd.localPort,
remoteAddr: addr, remotePort: port,
}, nil
}

func (fd *Fd) Close() {
fd.stop = time.Now().Unix()
}
56 changes: 56 additions & 0 deletions experimental/tracer/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"

"github.com/gorilla/mux"
)

func badRequest(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusBadRequest)
log.Printf("Error handling http request: %v", err.Error())
}

func (t *Tracer) http(port int) {
router := mux.NewRouter()

router.Methods("GET").Path("/pid").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pids := t.AttachedPids()
if err := json.NewEncoder(w).Encode(pids); err != nil {
badRequest(w, err)
}
})

router.Methods("POST").Path("/pid/{pid:\\d+}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pid, _ := strconv.Atoi(mux.Vars(r)["pid"])
t.TraceProcess(pid)
w.WriteHeader(204)
})

router.Methods("DELETE").Path("/pid/{pid:\\d+}").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pid, _ := strconv.Atoi(mux.Vars(r)["pid"])
t.StopTracing(pid)
w.WriteHeader(204)
})

router.Methods("GET").Path("/trace").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
traces := t.store.Traces()
if err := json.NewEncoder(w).Encode(traces); err != nil {
badRequest(w, err)
}
})

log.Printf("Launching HTTP API on port %d", port)
srv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router,
}

if err := srv.ListenAndServe(); err != nil {
log.Printf("Unable to create http listener: ", err)
}
}
30 changes: 30 additions & 0 deletions experimental/tracer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"log"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
"syscall"
)

func main() {
tracer := NewTracer()
go tracer.http(6060)
handleSignals()
}

func handleSignals() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT)
buf := make([]byte, 1<<20)
for {
sig := <-sigs
switch sig {
case syscall.SIGQUIT:
stacklen := runtime.Stack(buf, true)
log.Printf("=== received SIGQUIT ===\n*** goroutine dump...\n%s\n*** end\n", buf[:stacklen])
}
}
}
94 changes: 94 additions & 0 deletions experimental/tracer/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"fmt"
"io/ioutil"
"log"
"strconv"
"sync"
)

type Process struct {
sync.Mutex
pid int
tracer *Tracer
threads map[int]*Thread
fds map[int]*Fd
}

func NewProcess(pid int, tracer *Tracer) *Process {
return &Process{
pid: pid,
tracer: tracer,
threads: make(map[int]*Thread),
fds: make(map[int]*Fd),
}
}

func (p *Process) Trace() {
go p.trace()
}

// This doesn't actually guarantees we follow all the threads. Oops.
func (p *Process) trace() {
var (
attached int
)
log.Printf("Tracing process %d", p.pid)

for {
ps, err := ioutil.ReadDir(fmt.Sprintf("/proc/%d/task", p.pid))
if err != nil {
log.Printf("ReadDir failed, pid=%d, err=%v", p.pid, err)
return
}

attached = 0
for _, file := range ps {
pid, err := strconv.Atoi(file.Name())
if err != nil {
log.Printf("'%s' is not a pid: %v", file.Name, err)
attached++
continue
}

p.Lock()
t, ok := p.threads[pid]
if !ok {
t = p.tracer.TraceThread(pid, p)
p.threads[pid] = t
}
p.Unlock()

if !t.Attached() {
continue
}

attached++
}

// When we successfully attach to all threads
// we can be sure to catch new clones, so we
// can quit.
if attached == len(ps) {
break
}
}

log.Printf("Successfully attached to %d threads", attached)
}

func (p *Process) NewThread(thread *Thread) {
p.Lock()
defer p.Unlock()
p.threads[thread.pid] = thread
}

func (p *Process) NewFd(fd *Fd) error {
_, ok := p.fds[fd.fd]
if ok {
return fmt.Errorf("New fd %d, alread exists!", fd.fd)
}
p.fds[fd.fd] = fd
return nil
}

0 comments on commit 96b4687

Please sign in to comment.