From a22a2fa20110d17f14f0f910f9b9b9e7f03a1a76 Mon Sep 17 00:00:00 2001 From: "md.irohas" Date: Mon, 3 Jul 2023 02:02:20 +0900 Subject: [PATCH] refactor: refactor run.go --- rcap/run.go | 258 ++++++++++++++++++++++++++++--------------------- rcap/writer.go | 2 +- 2 files changed, 147 insertions(+), 113 deletions(-) diff --git a/rcap/run.go b/rcap/run.go index 00e2217..bc4a2f5 100644 --- a/rcap/run.go +++ b/rcap/run.go @@ -1,13 +1,15 @@ package rcap import ( - "io" + "errors" + "fmt" "log" "os" "os/signal" "syscall" "time" + "github.com/google/gopacket" "github.com/google/gopacket/pcap" ) @@ -16,152 +18,184 @@ const ( SamplingDump = 10000 ) -func Run(config *Config) error { - var reader *Reader - var writer *Writer - var err error +type Runner struct { + config *Config + reader *Reader + writer *Writer + doExit bool + doReload bool + numCapturedPackets uint64 + numSampledPackets uint64 +} - doExit := false - doReload := false +func NewRunner(c *Config) (*Runner, error) { + r := &Runner{ + config: c, + doExit: false, + doReload: false, + numCapturedPackets: 0, + numSampledPackets: 0, + } - // Trap signals. - log.Println("trap signals (send SIGHUP to reload, SIGINT or SIGTERM to exit).") - sigc := make(chan os.Signal, 1) - signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + return r, nil +} - go func() { - for { - s := <-sigc - log.Println("SIGNAL:", s) +func (r *Runner) Reload() error { + if r.config.Filename == "" { + return errors.New("no config file is set.") + } - switch s { - case syscall.SIGHUP: - doReload = true - case syscall.SIGINT, syscall.SIGTERM: - doExit = true - } - } - }() + newConfig, err := LoadConfig(r.config.Filename) + if err != nil { + return err + } - defer func() { - if reader != nil { - reader.Close() - log.Println("close reader.") - } - if writer != nil { - writer.Close() - log.Println("close writer.") - } - }() + // TODO: re-init reader and writer only when configuration has changed. + r.Close() - var numSampledPackets, numCapturedPackets uint64 = 0, 0 + log.Println("reload config and use the new config.") + r.config = newConfig + r.config.PrintToLog() -CAPTURE_LOOP: - for { - if doExit { - break CAPTURE_LOOP - } + return nil +} - // If the doReload flag is set, reload configurations from the file and - // close the current reader and writer. If configurations reloaded - // are invalid, do nothing. - if doReload { - if config.Filename == "" { - log.Println("failed to reload config. no config file is set.") - } else { - if newConfig, err := LoadConfig(config.Filename); err == nil { - // TODO: re-init reader and writer only when configuration has - // changed. - if reader != nil { - reader.Close() - reader = nil - } - if writer != nil { - writer.Close() - writer = nil - } - - log.Println("reload config and use the new config.") - config = newConfig - } else { - log.Printf("failed to reload config: %v", err) - log.Println("use the previous config instead.") - } - } +func (r *Runner) setupReaderAndWriter() error { + var err error - config.PrintToLog() - doReload = false + if r.reader == nil { + r.reader, err = NewReader(r.config) + if err != nil { + return err } + } - // Create reader and writer instances if they are not ready. - if reader == nil { - reader, err = NewReader(config) - if err != nil { - return err - } - } - if writer == nil { - linkType := reader.LinkType() - writer, err = NewWriter(config, linkType) - if err != nil { - return err - } - } + if r.writer == nil { + // The current NewWriter returns no error. + r.writer, _ = NewWriter(r.config, r.reader.LinkType()) + } + + return nil +} + +func (r *Runner) getTimestamp(capinfo gopacket.CaptureInfo, pkterr error) int64 { + if r.config.Rcap.UseSystemTime { + return time.Now().Unix() + } + + if pkterr != nil { + return time.Now().Unix() + } + + return capinfo.Timestamp.Unix() +} - data, capinfo, pkterr := reader.ReadPacket() +func (r *Runner) isSamplingMode() bool { + return r.config.Rcap.SamplingMode +} + +func (r *Runner) doSample() bool { + return Random() < r.config.Rcap.Sampling +} - var curTime int64 - if config.Rcap.UseSystemTime { - curTime = time.Now().Unix() - } else { - if pkterr != nil { - curTime = time.Now().Unix() - } else { - curTime = capinfo.Timestamp.Unix() +func (r *Runner) Run() error { + for !r.doExit { + if r.doReload { + if err := r.Reload(); err != nil { + log.Printf("failed to reload config: %v", err) } + r.doReload = false } - err = writer.Update(curTime) + err := r.setupReaderAndWriter() if err != nil { - log.Printf("failed to update writer: %v", err) - return err + return fmt.Errorf("failed to setup reader/writer: %w", err) + } + + data, capinfo, pkterr := r.reader.ReadPacket() + currentTime := r.getTimestamp(capinfo, pkterr) + + err = r.writer.Update(currentTime) + if err != nil { + return fmt.Errorf("failed to update writer: %w", err) } if pkterr != nil { switch pkterr { - // Do NOT log messages when it is timeouted. case pcap.NextErrorTimeoutExpired: - // The reader is already closed. - case io.EOF: - break CAPTURE_LOOP + // Go to next loop. + // Do NOT log messages when it is timeouted. + continue default: - log.Printf("failed to read packet: %v", pkterr) + // Return error (unexpected error). + return fmt.Errorf("failed to read packet: %w", pkterr) } - - continue } - numCapturedPackets++ - - if config.Rcap.SamplingMode { - sample := (Random() <= config.Rcap.Sampling) + r.numCapturedPackets++ + if r.isSamplingMode() { + doSample := r.doSample() - if numCapturedPackets%SamplingDump == 0 { - log.Printf("sampling result: %d/%d\n", numSampledPackets, numCapturedPackets) + if doSample { + r.numSampledPackets++ } - - if sample { - numSampledPackets++ - } else { + if r.numCapturedPackets%SamplingDump == 0 { + samplingRatio := float32(r.numSampledPackets) / float32(r.numCapturedPackets) * 100 + log.Printf("sampling result: %v/%v (%.2f%%)\n", r.numSampledPackets, r.numCapturedPackets, samplingRatio) + } + if !doSample { continue } } - err := writer.WritePacket(capinfo, data) + err = r.writer.WritePacket(capinfo, data) if err != nil { - log.Printf("failed to write packet: %v", err) + return fmt.Errorf("failed to write packet: %w", err) } } return nil } + +func (r *Runner) Close() { + if r.reader != nil { + r.reader.Close() + r.reader = nil + log.Println("close reader.") + } + if r.writer != nil { + r.writer.Close() + r.writer = nil + log.Println("close writer.") + } +} + +func Run(config *Config) error { + r, err := NewRunner(config) + if err != nil { + return err + } + + // Trap signals. + log.Println("trap signals (send SIGHUP to reload, SIGINT or SIGTERM to exit).") + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + + go func() { + for { + s := <-sigc + log.Println("SIGNAL:", s) + + switch s { + case syscall.SIGHUP: + r.doReload = true + case syscall.SIGINT, syscall.SIGTERM: + r.doExit = true + } + } + }() + + defer r.Close() + + return r.Run() +} diff --git a/rcap/writer.go b/rcap/writer.go index 1d240d1..db05e53 100644 --- a/rcap/writer.go +++ b/rcap/writer.go @@ -131,7 +131,7 @@ func (w *Writer) openWriter(ts int64) error { return err } - log.Printf("dump packets into a file: %v", fileName) + log.Printf("dump packets into a file: %v (append: %v)", fileName, !isNewFile) // Make a new writer. writer := pcapgo.NewWriter(file)