Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update stats in conjure-app #78

Merged
merged 2 commits into from
Mar 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions application/lib/proxies.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,29 +90,17 @@ func halfPipe(src, dst net.Conn,

var proxyStartTime = time.Now()

// using io.CopyBuffer doesn't let us see
// bytes / second (until very end of connect, then only avg)
// But io.CopyBuffer is very performant:
// actually doesn't use a buffer at all, just splices sockets
// together at the kernel level.
//
// We could try to use io.CopyN in a loop or something that
// gives us occasional bytes. CopyN would not splice, though
// (uses a LimitedReader that only calls Read)
buf := bufferPool.Get().([]byte)
written, err := io.CopyBuffer(dst, src, buf)
oncePrintErr.Do(
func() {
proxyEndTime := time.Since(proxyStartTime)
if err == nil {
stats := sessionStats{
Duration: int64(proxyEndTime / time.Millisecond),
Written: written,
Tag: tag,
Err: ""}
stats_str, _ := json.Marshal(stats)
logger.Printf("gracefully stopping forwarding %s", stats_str)
} else {
stats := sessionStats{
Duration: int64(proxyEndTime / time.Millisecond),
Written: written,
Tag: tag,
Err: err.Error()}
stats_str, _ := json.Marshal(stats)
logger.Printf("stopping forwarding due to err %s", stats_str)
}
})
if closeWriter, ok := dst.(interface {
CloseWrite() error
}); ok {
Expand All @@ -128,6 +116,22 @@ func halfPipe(src, dst net.Conn,
} else {
src.Close()
}
proxyEndTime := time.Since(proxyStartTime)
stats := sessionStats{
Duration: int64(proxyEndTime / time.Millisecond),
Written: written,
Tag: tag,
Err: ""}
if err != nil {
stats.Err = err.Error()
}
stats_str, _ := json.Marshal(stats)
logger.Printf("stopping forwarding %s", stats_str)
if strings.HasPrefix(tag, "Up") {
Stat().AddBytesUp(written)
} else {
Stat().AddBytesDown(written)
}
wg.Done()
}

Expand Down
3 changes: 3 additions & 0 deletions application/lib/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,9 @@ func (r *RegisteredDecoys) removeRegistration(index string) *regExpireLogMsg {
RegCount: expiredRegObj.regCount,
}

// Update stats
Stat().ExpireReg(expiredRegObj.DecoyListVersion, expiredRegObj.RegistrationSource)

// remove from timeout tracking
delete(r.decoysTimeouts, index)

Expand Down
169 changes: 169 additions & 0 deletions application/lib/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package lib

import (
"log"
"os"
"sync"
"sync/atomic"
"time"

pb "github.com/refraction-networking/gotapdance/protobuf"
)

// We would use uint64, but we want to atomically subtract sometimes
type Stats struct {
logger *log.Logger
activeConns int64 // incremented on add, decremented on remove, not reset
newConns int64 // new connections since last stats.reset()
newErrConns int64 // new connections that had some sort of error since last reset()

activeRegistrations int64 // Current number of active registrations we have
activeLocalRegistrations int64 // Current registrations that were picked up from this detector (also included in newRegistrations)
activeApiRegistrations int64 // Current registrations that we heard about from the API (also included in newRegistrations)
newRegistrations int64 // Added registrations since last reset()
newMissedRegistrations int64 // number of "missed" registrations (as seen by a connection with no registration)
newErrRegistrations int64 // number of registrations that had some kinda error
newDupRegistrations int64 // number of duplicate registrations (doesn't uniquify, so might have some double counting)

newLivenessPass int64 // Liveness tests that passed (non-live phantom) since reset()
newLivenessFail int64 // Liveness tests that failed (live phantom) since reset()

genMutex *sync.Mutex // Lock for generations map
generations map[uint32]int64 // Map from ClientConf generation to number of registrations we saw using it

newBytesUp int64 // TODO: need to redo halfPipe to make this not really jumpy
newBytesDown int64 // ditto
}

var statInstance Stats
var statsOnce sync.Once

// Returns our singleton stats
func Stat() *Stats {
statsOnce.Do(initStats)
return &statInstance
}

func initStats() {
logger := log.New(os.Stdout, "[STATS] ", log.Ldate|log.Lmicroseconds)
statInstance = Stats{
logger: logger,
generations: make(map[uint32]int64),
genMutex: &sync.Mutex{},
}

// Periodic PrintStats()
ticker := time.NewTicker(5 * time.Second)
go func() {
for {
select {
case <-ticker.C:
statInstance.PrintStats()
}
}
}()
}

func (s *Stats) Reset() {
atomic.StoreInt64(&s.newConns, 0)
atomic.StoreInt64(&s.newRegistrations, 0)
atomic.StoreInt64(&s.newMissedRegistrations, 0)
atomic.StoreInt64(&s.newErrRegistrations, 0)
atomic.StoreInt64(&s.newDupRegistrations, 0)
atomic.StoreInt64(&s.newLivenessPass, 0)
atomic.StoreInt64(&s.newLivenessFail, 0)
atomic.StoreInt64(&s.newBytesUp, 0)
atomic.StoreInt64(&s.newBytesDown, 0)
}

func (s *Stats) PrintStats() {
s.logger.Printf("Conns: %d cur %d new %d err Regs: %d cur (%d local %d API) %d new %d miss %d err %d dup LiveT: %d valid %d live Byte: %d up %d down",
atomic.LoadInt64(&s.activeConns), atomic.LoadInt64(&s.newConns), atomic.LoadInt64(&s.newErrConns),
atomic.LoadInt64(&s.activeRegistrations),
atomic.LoadInt64(&s.activeLocalRegistrations), atomic.LoadInt64(&s.activeApiRegistrations),
atomic.LoadInt64(&s.newRegistrations),
atomic.LoadInt64(&s.newMissedRegistrations),
atomic.LoadInt64(&s.newErrRegistrations), atomic.LoadInt64(&s.newDupRegistrations),
atomic.LoadInt64(&s.newLivenessPass), atomic.LoadInt64(&s.newLivenessFail),
atomic.LoadInt64(&s.newBytesUp), atomic.LoadInt64(&s.newBytesDown))
s.Reset()
}

func (s *Stats) AddConn() {
atomic.AddInt64(&s.activeConns, 1)
atomic.AddInt64(&s.newConns, 1)
}

func (s *Stats) CloseConn() {
atomic.AddInt64(&s.activeConns, -1)
}

func (s *Stats) ConnErr() {
atomic.AddInt64(&s.activeConns, -1)
atomic.AddInt64(&s.newErrConns, 1)
}

func (s *Stats) AddReg(generation uint32, source *pb.RegistrationSource) {
atomic.AddInt64(&s.activeRegistrations, 1)
atomic.AddInt64(&s.newRegistrations, 1)

if *source == pb.RegistrationSource_DetectorPrescan {
atomic.AddInt64(&s.activeLocalRegistrations, 1)
//atomic.AddInt64(&s.newLocalRegistrations, 1) // I don't think a delta of this is super useful...
} else {
atomic.AddInt64(&s.activeApiRegistrations, 1)
//atomic.AddInt64(&s.newApiRegistrations, 1)
}
s.genMutex.Lock()
s.generations[generation] += 1
s.genMutex.Unlock()
}

func (s *Stats) AddDupReg() {
atomic.AddInt64(&s.newDupRegistrations, 1)
}

func (s *Stats) AddErrReg() {
atomic.AddInt64(&s.newErrRegistrations, 1)
}

func (s *Stats) ExpireReg(generation uint32, source *pb.RegistrationSource) {
atomic.AddInt64(&s.activeRegistrations, -1)

if *source == pb.RegistrationSource_DetectorPrescan {
atomic.AddInt64(&s.activeLocalRegistrations, -1)
} else {
atomic.AddInt64(&s.activeApiRegistrations, -1)
}
s.genMutex.Lock()
s.generations[generation] -= 1
s.genMutex.Unlock()
}

func (s *Stats) AddMissedReg() {
atomic.AddInt64(&s.newMissedRegistrations, 1)
}

func (s *Stats) AddLivenessPass() {
atomic.AddInt64(&s.newLivenessPass, 1)
}

func (s *Stats) AddLivenessFail() {
atomic.AddInt64(&s.newLivenessFail, 1)
}

func (s *Stats) AddBytesUp(n int64) {
atomic.AddInt64(&s.newBytesUp, n)
}

func (s *Stats) AddBytesDown(n int64) {
atomic.AddInt64(&s.newBytesDown, n)
}

func (s *Stats) AddBytes(n int64, dir string) {
if dir == "Up" {
s.AddBytesUp(n)
} else {
s.AddBytesDown(n)
}
}
18 changes: 17 additions & 1 deletion application/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func handleNewConn(regManager *cj.RegistrationManager, clientConn *net.TCPConn)

count := regManager.CountRegistrations(originalDstIP)
logger.Printf("new connection (%d potential registrations)\n", count)
cj.Stat().AddConn()

// Pick random timeout between 10 and 60 seconds, down to millisecond precision
ms := rand.Int63n(50000) + 10000
Expand All @@ -102,6 +103,8 @@ func handleNewConn(regManager *cj.RegistrationManager, clientConn *net.TCPConn)
// in userspace before the SYN-ACK is sent, increasing probe
// resistance.
logger.Printf("no possible registrations, reading for %v then dropping connection\n", timeout)
cj.Stat().AddMissedReg()
cj.Stat().CloseConn()

// Copy into ioutil.Discard to keep ACKing until the deadline.
// This should help prevent fingerprinting; if we let the read
Expand All @@ -122,13 +125,15 @@ readLoop:
for {
if len(possibleTransports) < 1 {
logger.Printf("ran out of possible transports, reading for %v then giving up\n", time.Until(deadline))
cj.Stat().ConnErr()
io.Copy(ioutil.Discard, clientConn)
return
}

n, err := clientConn.Read(buf[:])
if err != nil {
logger.Printf("got error while reading from connection, giving up after %d bytes: %v\n", received.Len(), err)
cj.Stat().ConnErr()
return
}
received.Write(buf[:n])
Expand All @@ -149,6 +154,7 @@ readLoop:
// may no longer be valid. We should just give up on this connection.
d := time.Until(deadline)
logger.Printf("got unexpected error from transport %s, sleeping %v then giving up: %v\n", t.Name(), d, err)
cj.Stat().ConnErr()
time.Sleep(d)
return
}
Expand All @@ -162,6 +168,7 @@ readLoop:
}

cj.Proxy(reg, wrapped, logger)
cj.Stat().CloseConn()
}

func get_zmq_updates(connectAddr string, regManager *cj.RegistrationManager, conf *cj.Config) {
Expand Down Expand Up @@ -200,11 +207,13 @@ func get_zmq_updates(connectAddr string, regManager *cj.RegistrationManager, con
if regManager.RegistrationExists(reg) {
// log phantom IP, shared secret, ipv6 support
logger.Printf("Duplicate registration: %v %s\n", reg.IDString(), reg.RegistrationSource)
cj.Stat().AddDupReg()

// Track the received registration, if it is already tracked it will just update the record
err := regManager.TrackRegistration(reg)
if err != nil {
logger.Println("error tracking registration: ", err)
cj.Stat().AddErrReg()
}
continue
}
Expand All @@ -216,11 +225,13 @@ func get_zmq_updates(connectAddr string, regManager *cj.RegistrationManager, con
err := regManager.TrackRegistration(reg)
if err != nil {
logger.Println("error tracking registration: ", err)
cj.Stat().AddErrReg()
}

// If registration is trying to connect to a dark decoy that is blocklisted continue
if reg.Covert == "" || conf.IsBlocklisted(reg.Covert) {
logger.Printf("Dropping reg, malformed or blocklisted covert: %v, %s, %v", reg.IDString(), reg.Covert, err)
cj.Stat().AddErrReg()
continue
}

Expand All @@ -229,16 +240,17 @@ func get_zmq_updates(connectAddr string, regManager *cj.RegistrationManager, con
liveness, response := reg.PhantomIsLive()
if liveness == true {
logger.Printf("Dropping registration %v -- live phantom: %v\n", reg.IDString(), response)
cj.Stat().AddLivenessFail()
continue
}
cj.Stat().AddLivenessPass()
}

if conf.EnableShareOverAPI && *reg.RegistrationSource == pb.RegistrationSource_Detector {
// Registration received from decoy-registrar, share over API if enabled.
go tryShareRegistrationOverAPI(reg, conf.PreshareEndpoint)
}


if conf.IsBlocklistedPhantom(reg.DarkDecoy) {
// Note: Phantom blocklist is applied at this stage because the phantom may only be blocked on this
// station. We may want other stations to be informed about the registration, but prevent this station
Expand All @@ -250,6 +262,7 @@ func get_zmq_updates(connectAddr string, regManager *cj.RegistrationManager, con
// validate the registration
regManager.AddRegistration(reg)
logger.Printf("Adding registration %v\n", reg.IDString())
cj.Stat().AddReg(reg.DecoyListVersion, reg.RegistrationSource)
}
}()

Expand Down Expand Up @@ -383,6 +396,9 @@ func main() {
logClientIP = false
}

// Init stats
cj.Stat()

// parse toml station configuration
conf, err := cj.ParseConfig()
if err != nil {
Expand Down