Skip to content

Commit

Permalink
ログ受信と通知処理を移植
Browse files Browse the repository at this point in the history
  • Loading branch information
twsnmp committed Jan 9, 2021
1 parent 2540548 commit a0d0434
Show file tree
Hide file tree
Showing 10 changed files with 1,028 additions and 5 deletions.
12 changes: 7 additions & 5 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ type DataStore struct {
influxc client.Client
muInfluxc sync.Mutex

protMap map[int]string
serviceMap map[string]string
geoip *geoip2.Reader
geoipMap map[string]string
ouiMap map[string]string
protMap map[int]string
serviceMap map[string]string
geoip *geoip2.Reader
geoipMap map[string]string
ouiMap map[string]string
logSize int64
compLogSize int64
}

const (
Expand Down
54 changes: 54 additions & 0 deletions datastore/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,60 @@ func (ds *DataStore) DeleteAIReesult(id string) error {
})
}

func (ds *DataStore) SaveLogBuffer(logBuffer []*LogEnt) {
if ds.db == nil {
log.Printf("saveLogBuffer DB Not open")
return
}
_ = ds.db.Batch(func(tx *bbolt.Tx) error {
syslog := tx.Bucket([]byte("syslog"))
netflow := tx.Bucket([]byte("netflow"))
ipfix := tx.Bucket([]byte("ipfix"))
trap := tx.Bucket([]byte("trap"))
arplog := tx.Bucket([]byte("arplog"))
for _, l := range logBuffer {
k := fmt.Sprintf("%016x", l.Time)
s, err := json.Marshal(l)
if err != nil {
return err
}
ds.logSize += int64(len(s))
if len(s) > 100 {
s = compressLog(s)
}
ds.compLogSize += int64(len(s))
switch l.Type {
case "syslog":
_ = syslog.Put([]byte(k), []byte(s))
case "netflow":
_ = netflow.Put([]byte(k), []byte(s))
case "ipfix":
_ = ipfix.Put([]byte(k), []byte(s))
case "trap":
_ = trap.Put([]byte(k), []byte(s))
case "arplog":
_ = arplog.Put([]byte(k), []byte(s))
}
}
return nil
})
}

func compressLog(s []byte) []byte {
var b bytes.Buffer
f, _ := flate.NewWriter(&b, flate.DefaultCompression)
if _, err := f.Write(s); err != nil {
return s
}
if err := f.Flush(); err != nil {
return s
}
if err := f.Close(); err != nil {
return s
}
return b.Bytes()
}

func deCompressLog(s []byte) []byte {
r := flate.NewReader(bytes.NewBuffer(s))
d, err := ioutil.ReadAll(r)
Expand Down
6 changes: 6 additions & 0 deletions datastore/mapdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (ds *DataStore) FindNodeFromIP(ip string) *NodeEnt {
return ret
}

func (ds *DataStore) ForEachNodes(f func(*NodeEnt) bool) {
ds.Nodes.Range(func(_, p interface{}) bool {
return f(p.(*NodeEnt))
})
}

func (ds *DataStore) AddLine(l *LineEnt) error {
for {
l.ID = makeKey()
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/rakyll/statik v0.1.7
github.com/robertkrimen/otto v0.0.0-20200922221731-ef014fd054ac
github.com/signalsciences/ipv4 v1.4.0
github.com/tehmaze/netflow v0.0.0-20170921210347-852af103667f // indirect
github.com/twsnmp/go-mibdb v0.0.0-20210104220414-91387072cee7
github.com/twsnmp/gosnmp v1.28.4
github.com/vjeantet/grok v1.0.1
Expand All @@ -26,5 +27,6 @@ require (
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/mcuadros/go-syslog.v2 v2.3.0 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tehmaze/netflow v0.0.0-20170921210347-852af103667f h1:5dYJ4SDGn0SYo/BSXCDTf7W+LeUDj7v+VwXX723Kymw=
github.com/tehmaze/netflow v0.0.0-20170921210347-852af103667f/go.mod h1:QRP5wJOf7gGMGL2fCAfmh/5CMZQspRxT5DqghaPRrjM=
github.com/twsnmp/go-mibdb v0.0.0-20210104220414-91387072cee7 h1:FXtkgH4JQh+RygmxCq+P1J+ik61MaoKmZpu/8jMzfw0=
github.com/twsnmp/go-mibdb v0.0.0-20210104220414-91387072cee7/go.mod h1:fqYxYtdzPjVkqu1Dafev4I98LrT/Eg8F0DAnAcnDkkU=
github.com/twsnmp/gosnmp v1.28.4 h1:LCktx4hUAfMKPdGommsGWgiqSkp7ieEk6NThkGr/3yk=
Expand Down Expand Up @@ -113,6 +115,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQ
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/mcuadros/go-syslog.v2 v2.3.0 h1:kcsiS+WsTKyIEPABJBJtoG0KkOS6yzvJ+/eZlhD79kk=
gopkg.in/mcuadros/go-syslog.v2 v2.3.0/go.mod h1:l5LPIyOOyIdQquNg+oU6Z3524YwrcqEm0aKH+5zpt2U=
gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI=
gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
106 changes: 106 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Package logger : ログ受信処理
package logger

/*
syslog,tarp,netflow5,ipfixをログに記録する
*/

import (
"context"
"log"

"time"

"github.com/twsnmp/twsnmpfc/datastore"
"github.com/twsnmp/twsnmpfc/report"
)

type Logger struct {
ds *datastore.DataStore
report *report.Report
logCh chan *datastore.LogEnt
}

func NewLogger(ctx context.Context, ds *datastore.DataStore, r *report.Report) *Logger {
l := &Logger{
ds: ds,
report: r,
logCh: make(chan *datastore.LogEnt, 100),
}
l.logger(ctx)
return l
}

func (l *Logger) logger(ctx context.Context) {
var syslogdRunning = false
var trapdRunning = false
var netflowdRunning = false
var stopSyslogd chan bool
var stopTrapd chan bool
var stopNetflowd chan bool
timer := time.NewTicker(time.Second * 10)
logBuffer := []*datastore.LogEnt{}
for {
select {
case <-ctx.Done():
{
timer.Stop()
if len(logBuffer) > 0 {
l.ds.SaveLogBuffer(logBuffer)
}
if syslogdRunning {
close(stopSyslogd)
}
if netflowdRunning {
close(stopNetflowd)
}
if trapdRunning {
close(stopTrapd)
}
log.Printf("Stop logger")
return
}
case log := <-l.logCh:
{
logBuffer = append(logBuffer, log)
}
case <-timer.C:
{
if len(logBuffer) > 0 {
l.ds.SaveLogBuffer(logBuffer)
logBuffer = []*datastore.LogEnt{}
}
if l.ds.MapConf.EnableSyslogd && !syslogdRunning {
stopSyslogd = make(chan bool)
syslogdRunning = true
go l.syslogd(stopSyslogd)
log.Printf("start syslogd")
} else if !l.ds.MapConf.EnableSyslogd && syslogdRunning {
close(stopSyslogd)
syslogdRunning = false
log.Printf("stop syslogd")
}
if l.ds.MapConf.EnableTrapd && !trapdRunning {
stopTrapd = make(chan bool)
trapdRunning = true
go l.snmptrapd(stopTrapd)
log.Printf("start trapd")
} else if !l.ds.MapConf.EnableTrapd && trapdRunning {
close(stopTrapd)
trapdRunning = false
log.Printf("stop trapd")
}
if l.ds.MapConf.EnableNetflowd && !netflowdRunning {
stopNetflowd = make(chan bool)
netflowdRunning = true
go l.netflowd(stopNetflowd)
log.Printf("start netflowd")
} else if !l.ds.MapConf.EnableNetflowd && netflowdRunning {
close(stopNetflowd)
netflowdRunning = false
log.Printf("stop netflowd")
}
}
}
}
}

0 comments on commit a0d0434

Please sign in to comment.