Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
- Do not communicate by sharing memory; instead, share memory by
  communicating
- Test skeleton
- Hardcode a limit to ten simultaneous flushed connection
- Stop now correctly wait for dam to be empty
  • Loading branch information
simkim committed Jul 21, 2016
1 parent 2c1cfa0 commit 33c331c
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 49 deletions.
24 changes: 12 additions & 12 deletions cmd/tcpdam/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
func configFromEnv(key string, _default string) string {
if os.Getenv(key) != "" {
return os.Getenv(key)
} else {
return _default
}
return _default
}

func configFromEnvBool(key string, _default bool) bool {
Expand All @@ -25,9 +24,8 @@ func configFromEnvBool(key string, _default bool) bool {
return _default
}
return rc
} else {
return _default
}
return _default
}

var (
Expand Down Expand Up @@ -66,27 +64,27 @@ func teardownPidfile() error {
return err
}

func setupPidfile() (error, bool) {
func setupPidfile() (bool, error) {
if *pidFile == "" {
return nil, false
return false, nil
}
file, err := os.OpenFile(*pidFile, os.O_CREATE|os.O_EXCL|os.O_WRONLY|os.O_TRUNC, 0777)
if err != nil {
return err, false
return false, err
}
pid := os.Getpid()
log.Debugf("Write pid %d to pidfile %s\n", pid, *pidFile)
file.Write([]byte(strconv.Itoa(pid)))
file.Close()
return nil, true
return true, nil
}

var log = logging.MustGetLogger("tcpdam")

func main() {
flag.Parse()
setupLogging()
err, hasPid := setupPidfile()
hasPid, err := setupPidfile()
if err != nil {
log.Errorf("Can't create pid file : %s\n", err.Error())
os.Exit(1)
Expand All @@ -96,9 +94,11 @@ func main() {
}
}
log.Noticef("tcpdam started (%s -> %s)", *listenAddr, *remoteAddr)
dam := tcpdam.NewDam(listenAddr, remoteAddr)
dam := tcpdam.NewDam(*listenAddr, *remoteAddr, *maxParkedProxies)
dam.Logger = log
dam.MaxParkedProxies = *maxParkedProxies
dam.Start()
err = dam.Start()
if err != nil {
log.Errorf("An error occured: %s", err.Error())
}
log.Notice("tcpdam stopped")
}
81 changes: 44 additions & 37 deletions dam.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,26 @@ import (
type Dam struct {
MaxParkedProxies int
open bool
listenAddr *string
remoteAddr *string
listenAddr string
remoteAddr string
raddr *net.TCPAddr
listener *net.TCPListener
parkedProxies []*Proxy
parkedProxiesLock *sync.Mutex
parkedProxiesCond *sync.Cond
parkedProxies chan *Proxy
flushingProxies chan bool
shouldQuitCond sync.Cond
quit chan bool
sigs chan os.Signal
Logger *logging.Logger
}

func NewDam(listenAddr *string, remoteAddr *string) *Dam {
mutex := &sync.Mutex{}
func NewDam(listenAddr string, remoteAddr string, maxParked int) *Dam {
return &Dam{
MaxParkedProxies: 0,
open: false,
listenAddr: listenAddr,
remoteAddr: remoteAddr,
parkedProxies: make([]*Proxy, 0),
parkedProxiesLock: mutex,
parkedProxiesCond: sync.NewCond(mutex),
parkedProxies: make(chan *Proxy, maxParked),
flushingProxies: make(chan bool, 10), //FIXME: update buffer size once the flush algorithm is known
Logger: logging.MustGetLogger("dam"),
shouldQuitCond: sync.Cond{L: &sync.Mutex{}},
}
Expand All @@ -50,23 +47,21 @@ func (dam *Dam) Dial() (*net.TCPConn, error) {
return rconn, err
}

func (dam *Dam) Flushed(p *Proxy) {
<- dam.flushingProxies
}

func (dam *Dam) Push(p *Proxy) {
p.Dam = dam
p.Logger = dam.Logger

dam.parkedProxiesLock.Lock()

for !dam.open && dam.MaxParkedProxies != 0 && len(dam.parkedProxies) >= dam.MaxParkedProxies {
dam.Logger.Debugf("Too many connections, waiting free slots : %d >= %d\n", len(dam.parkedProxies), dam.MaxParkedProxies)
dam.parkedProxiesCond.Wait()
}

if dam.open {
dam.flushingProxies <- true
go p.Flush()
} else {
dam.parkedProxies = append(dam.parkedProxies, p)
dam.parkedProxies <- p
}
dam.parkedProxiesLock.Unlock()

}

func (dam *Dam) Close() {
Expand All @@ -80,12 +75,12 @@ func (dam *Dam) Close() {

func (dam *Dam) Open() error {
if dam.open {
dam.Logger.Debugf("Already opened")
dam.Logger.Debug("Already opened")
return nil
}
dam.Logger.Debugf("Resolving %s\n", *dam.remoteAddr)
dam.Logger.Debugf("Resolving %s\n", dam.remoteAddr)
var err error
dam.raddr, err = net.ResolveTCPAddr("tcp", *dam.remoteAddr)
dam.raddr, err = net.ResolveTCPAddr("tcp", dam.remoteAddr)
if err != nil {
dam.Logger.Warningf("Can't resolve remote addr %s\n", err.Error())
return err
Expand All @@ -100,26 +95,31 @@ func (dam *Dam) Open() error {
func (dam *Dam) Flush() {
dam.Logger.Debug("Flushing dam requested")

dam.parkedProxiesLock.Lock()
dam.Logger.Debug("Flushing dam")
for _, proxy := range dam.parkedProxies {
go proxy.Flush()
for {
dam.Logger.Debug("Flushing dam ...")
select {
case p := <- dam.parkedProxies:
dam.flushingProxies <- true
go p.Flush()
default:
goto end
}
}
dam.parkedProxies = make([]*Proxy, 0)
dam.parkedProxiesLock.Unlock()

dam.parkedProxiesCond.Broadcast()
end:
dam.Logger.Debug("Flushing dam done")
}

func (dam *Dam) Start() {
laddr, err := net.ResolveTCPAddr("tcp", *dam.listenAddr)
func (dam *Dam) Start() error {
laddr, err := net.ResolveTCPAddr("tcp", dam.listenAddr)
if err != nil {
panic(err)
dam.Logger.Errorf("Can't resolve listen address: %s", err.Error())
return err
}

dam.listener, err = net.ListenTCP("tcp", laddr)
if err != nil {
panic(err)
dam.Logger.Errorf("Can't listen: %s", err.Error())
return err
}
dam.quit = make(chan bool, 1)
defer dam.StopListeningSignal()
Expand All @@ -135,12 +135,13 @@ func (dam *Dam) Start() {
case <-dam.quit:
dam.Logger.Debug("Received quit -> return")
dam.listener.Close()
return
dam.waitEmpty()
return nil
default:
continue
}
} else {
panic(err)
return err
}
}
p := &Proxy{
Expand All @@ -155,7 +156,13 @@ func (dam *Dam) Stop() {
dam.quit <- true
}

func (dam *Dam) WaitEmpty() {
func (dam *Dam) waitEmpty() {
dam.Logger.Debug("Wait the dam to become empty")
for len(dam.flushingProxies) > 0 {
dam.Logger.Debug("Wait the dam to become empty loop")
time.Sleep(1*time.Second)
}
dam.Logger.Debug("Wait the dam to become empty loop done")
}

func (dam *Dam) ListenSignal() {
Expand Down
29 changes: 29 additions & 0 deletions dam_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package tcpdam_test

import "github.com/simkim/tcpdam"
import "testing"

// disclamer : I don't now how to test code in got yet ...

var (
la = ":9999"
ra = "example.com:80"
)

func TestCreateDam(t *testing.T) {
dam := tcpdam.NewDam(la, ra)
if dam == nil {
t.Errorf("Dam should not be nil")
}
}

func TestOpeningDam(t *testing.T) {
dam := tcpdam.NewDam(la, ra)
if dam == nil {
t.Errorf("Dam should not be nil")
}
err := dam.Open()
if err != nil {
t.Errorf("Can't open the dam : %s", err.Error)
}
}
1 change: 1 addition & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Proxy struct {
func (p *Proxy) Flush() error {
p.Logger.Debug("Flush connection")
defer p.Lconn.Close()
defer p.Dam.Flushed(p)
Rconn, err := p.Dam.Dial()
if err != nil {
return err
Expand Down

0 comments on commit 33c331c

Please sign in to comment.