Permalink
Browse files

Save records to file or stdout

  • Loading branch information...
1 parent c24a8ed commit 5d4999a68e6b8980dcf97077dff0229219d4960b @temoto committed Dec 6, 2012
Showing with 51 additions and 8 deletions.
  1. +41 −1 oakmole-radar/main.go
  2. +10 −7 oakmole-radar/net.go
View
@@ -2,17 +2,20 @@ package main
import (
"flag"
+ "github.com/temoto/oakmole/oakmole"
"log"
"os"
"os/signal"
"runtime"
"runtime/pprof"
+ "sync"
"syscall"
"time"
)
func main() {
bind := flag.String("bind", ":80", "Listen on address:port")
+ outPath := flag.String("out", "-", "Path to dump file. '-' for stdout")
cpuprofile := flag.String("cpuprofile", "", "Write CPU profile to file")
memprofile := flag.String("memprofile", "", "Write memory profile to file")
flag.Parse()
@@ -46,7 +49,10 @@ func main() {
// Set number of parallel threads to number of CPUs.
runtime.GOMAXPROCS(runtime.NumCPU())
+ // Tweak record channel buffer size.
+ rch := make(chan *oakmole.Record, 100)
stop := make(chan bool)
+ workers := new(sync.WaitGroup)
sigIntChan := make(chan os.Signal, 1)
signal.Notify(sigIntChan, syscall.SIGINT)
@@ -55,8 +61,42 @@ func main() {
stop <- true
}()
- listen := startListener(bind)
+ var outFile *os.File
+ var err error
+ if *outPath != "-" {
+ outFile, err = os.OpenFile(*outPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
+ if err != nil {
+ log.Println("Writer: Open:", err)
+ os.Exit(1)
+ }
+ } else {
+ outFile = os.Stdout
+ }
+ // Buffer
+ workers.Add(1)
+ go func() {
+ for record := range rch {
+ b, err := record.Marshal()
+ if err != nil {
+ log.Println("Writer: Marshal:", err)
+ }
+ if _, err = outFile.Write(b); err != nil {
+ log.Println("Writer: Write:", err)
+ }
+ }
+ workers.Done()
+ }()
+
+ listen := startListener(bind, rch)
defer listen.Close()
<-stop
+ close(rch)
+ workers.Wait()
+ if outFile != os.Stdout {
+ if err = outFile.Sync(); err != nil {
+ log.Println("Writer: Sync:", err)
+ os.Exit(1)
+ }
+ }
}
View
@@ -13,7 +13,7 @@ import (
const IOTimeout time.Duration = 10 * time.Second
const ReadBufferSize = 16 << 10
-func acceptLoop(listen *net.TCPListener) {
+func acceptLoop(listen *net.TCPListener, out chan *oakmole.Record) {
// TODO: try runtime.LockOSThread()
// TODO: create buffers
for {
@@ -22,11 +22,11 @@ func acceptLoop(listen *net.TCPListener) {
log.Println(err)
return
}
- go connectionHandler(conn)
+ go connectionHandler(conn, out)
}
}
-func connectionHandler(conn *net.TCPConn) {
+func connectionHandler(conn *net.TCPConn, out chan *oakmole.Record) {
defer conn.Close()
timeBegin := time.Now()
conn.SetKeepAlive(false)
@@ -68,12 +68,15 @@ func connectionHandler(conn *net.TCPConn) {
HttpHost: readHost(buffer),
Body: buffer,
}
- log.Println("Read: success local:", addrLocal, "remote:", addrRemote, "size:", totalSize, "first bytes:", string(buffer[:20]))
+ // log.Println("Read: success local:", addrLocal, "remote:", addrRemote, "size:", totalSize, "first bytes:", string(buffer[:20]))
- // TODO: save
+ // t1 := time.Now()
+ out <- record
+ // outSendTime := time.Now().Sub(t1)
+ // log.Println("connectionHandler: out<- time:", outSendTime)
}
-func startListener(bind *string) *net.TCPListener {
+func startListener(bind *string, out chan *oakmole.Record) *net.TCPListener {
addr, err := net.ResolveTCPAddr("tcp", *bind)
if err != nil {
log.Fatalln(err)
@@ -85,7 +88,7 @@ func startListener(bind *string) *net.TCPListener {
}
for i := 1; i <= runtime.NumCPU(); i++ {
- go acceptLoop(listen)
+ go acceptLoop(listen, out)
}
return listen
}

0 comments on commit 5d4999a

Please sign in to comment.