Skip to content

Commit

Permalink
refactor: refactor run.go
Browse files Browse the repository at this point in the history
  • Loading branch information
md-irohas committed Jul 2, 2023
1 parent 4c35f89 commit a22a2fa
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 113 deletions.
258 changes: 146 additions & 112 deletions rcap/run.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package rcap

import (
"io"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
)

Expand All @@ -16,152 +18,184 @@ const (
SamplingDump = 10000
)

func Run(config *Config) error {
var reader *Reader
var writer *Writer
var err error
type Runner struct {
config *Config
reader *Reader
writer *Writer
doExit bool
doReload bool
numCapturedPackets uint64
numSampledPackets uint64
}

doExit := false
doReload := false
func NewRunner(c *Config) (*Runner, error) {
r := &Runner{
config: c,
doExit: false,
doReload: false,
numCapturedPackets: 0,
numSampledPackets: 0,
}

// Trap signals.
log.Println("trap signals (send SIGHUP to reload, SIGINT or SIGTERM to exit).")
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
return r, nil
}

go func() {
for {
s := <-sigc
log.Println("SIGNAL:", s)
func (r *Runner) Reload() error {
if r.config.Filename == "" {
return errors.New("no config file is set.")
}

switch s {
case syscall.SIGHUP:
doReload = true
case syscall.SIGINT, syscall.SIGTERM:
doExit = true
}
}
}()
newConfig, err := LoadConfig(r.config.Filename)
if err != nil {
return err
}

defer func() {
if reader != nil {
reader.Close()
log.Println("close reader.")
}
if writer != nil {
writer.Close()
log.Println("close writer.")
}
}()
// TODO: re-init reader and writer only when configuration has changed.
r.Close()

var numSampledPackets, numCapturedPackets uint64 = 0, 0
log.Println("reload config and use the new config.")
r.config = newConfig
r.config.PrintToLog()

CAPTURE_LOOP:
for {
if doExit {
break CAPTURE_LOOP
}
return nil
}

// If the doReload flag is set, reload configurations from the file and
// close the current reader and writer. If configurations reloaded
// are invalid, do nothing.
if doReload {
if config.Filename == "" {
log.Println("failed to reload config. no config file is set.")
} else {
if newConfig, err := LoadConfig(config.Filename); err == nil {
// TODO: re-init reader and writer only when configuration has
// changed.
if reader != nil {
reader.Close()
reader = nil
}
if writer != nil {
writer.Close()
writer = nil
}

log.Println("reload config and use the new config.")
config = newConfig
} else {
log.Printf("failed to reload config: %v", err)
log.Println("use the previous config instead.")
}
}
func (r *Runner) setupReaderAndWriter() error {
var err error

config.PrintToLog()
doReload = false
if r.reader == nil {
r.reader, err = NewReader(r.config)
if err != nil {
return err
}
}

// Create reader and writer instances if they are not ready.
if reader == nil {
reader, err = NewReader(config)
if err != nil {
return err
}
}
if writer == nil {
linkType := reader.LinkType()
writer, err = NewWriter(config, linkType)
if err != nil {
return err
}
}
if r.writer == nil {
// The current NewWriter returns no error.
r.writer, _ = NewWriter(r.config, r.reader.LinkType())
}

return nil
}

func (r *Runner) getTimestamp(capinfo gopacket.CaptureInfo, pkterr error) int64 {
if r.config.Rcap.UseSystemTime {
return time.Now().Unix()
}

if pkterr != nil {
return time.Now().Unix()
}

return capinfo.Timestamp.Unix()
}

data, capinfo, pkterr := reader.ReadPacket()
func (r *Runner) isSamplingMode() bool {
return r.config.Rcap.SamplingMode
}

func (r *Runner) doSample() bool {
return Random() < r.config.Rcap.Sampling
}

var curTime int64
if config.Rcap.UseSystemTime {
curTime = time.Now().Unix()
} else {
if pkterr != nil {
curTime = time.Now().Unix()
} else {
curTime = capinfo.Timestamp.Unix()
func (r *Runner) Run() error {
for !r.doExit {
if r.doReload {
if err := r.Reload(); err != nil {
log.Printf("failed to reload config: %v", err)
}
r.doReload = false
}

err = writer.Update(curTime)
err := r.setupReaderAndWriter()
if err != nil {
log.Printf("failed to update writer: %v", err)
return err
return fmt.Errorf("failed to setup reader/writer: %w", err)
}

data, capinfo, pkterr := r.reader.ReadPacket()
currentTime := r.getTimestamp(capinfo, pkterr)

err = r.writer.Update(currentTime)
if err != nil {
return fmt.Errorf("failed to update writer: %w", err)
}

if pkterr != nil {
switch pkterr {
// Do NOT log messages when it is timeouted.
case pcap.NextErrorTimeoutExpired:
// The reader is already closed.
case io.EOF:
break CAPTURE_LOOP
// Go to next loop.
// Do NOT log messages when it is timeouted.
continue
default:
log.Printf("failed to read packet: %v", pkterr)
// Return error (unexpected error).
return fmt.Errorf("failed to read packet: %w", pkterr)
}

continue
}

numCapturedPackets++

if config.Rcap.SamplingMode {
sample := (Random() <= config.Rcap.Sampling)
r.numCapturedPackets++
if r.isSamplingMode() {
doSample := r.doSample()

if numCapturedPackets%SamplingDump == 0 {
log.Printf("sampling result: %d/%d\n", numSampledPackets, numCapturedPackets)
if doSample {
r.numSampledPackets++
}

if sample {
numSampledPackets++
} else {
if r.numCapturedPackets%SamplingDump == 0 {
samplingRatio := float32(r.numSampledPackets) / float32(r.numCapturedPackets) * 100
log.Printf("sampling result: %v/%v (%.2f%%)\n", r.numSampledPackets, r.numCapturedPackets, samplingRatio)
}
if !doSample {
continue
}
}

err := writer.WritePacket(capinfo, data)
err = r.writer.WritePacket(capinfo, data)
if err != nil {
log.Printf("failed to write packet: %v", err)
return fmt.Errorf("failed to write packet: %w", err)
}
}

return nil
}

func (r *Runner) Close() {
if r.reader != nil {
r.reader.Close()
r.reader = nil
log.Println("close reader.")
}
if r.writer != nil {
r.writer.Close()
r.writer = nil
log.Println("close writer.")
}
}

func Run(config *Config) error {
r, err := NewRunner(config)
if err != nil {
return err
}

// Trap signals.
log.Println("trap signals (send SIGHUP to reload, SIGINT or SIGTERM to exit).")
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

go func() {
for {
s := <-sigc
log.Println("SIGNAL:", s)

switch s {
case syscall.SIGHUP:
r.doReload = true
case syscall.SIGINT, syscall.SIGTERM:
r.doExit = true
}
}
}()

defer r.Close()

return r.Run()
}
2 changes: 1 addition & 1 deletion rcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (w *Writer) openWriter(ts int64) error {
return err
}

log.Printf("dump packets into a file: %v", fileName)
log.Printf("dump packets into a file: %v (append: %v)", fileName, !isNewFile)

// Make a new writer.
writer := pcapgo.NewWriter(file)
Expand Down

0 comments on commit a22a2fa

Please sign in to comment.