Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

Commit

Permalink
[medium] Add persistent module support in agent + audit persistent mo…
Browse files Browse the repository at this point in the history
…dule
  • Loading branch information
arunk-s committed Jul 18, 2016
1 parent a06734a commit 25c6d6a
Show file tree
Hide file tree
Showing 5 changed files with 1,058 additions and 0 deletions.
1 change: 1 addition & 0 deletions conf/available_modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package main

import (
_ "mig.ninja/mig/modules/agentdestroy"
_ "mig.ninja/mig/modules/audit"
_ "mig.ninja/mig/modules/file"
_ "mig.ninja/mig/modules/memory"
_ "mig.ninja/mig/modules/netstat"
Expand Down
3 changes: 3 additions & 0 deletions conf/mig-agent-conf.go.inc
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,6 @@ MIIEpAIBAAKCAQEAvJQqCjE4I63S3kR9KV0EG9e/lX/bZxa/2QVvZGi9/Suj65nD
........
RMSEpg+wuIVnKUi6KThiMKyXfZaTX7BDuR/ezE/JHs1TN5Hkw43TCQ==
-----END RSA PRIVATE KEY-----`)

//PERSISTENTMODULES contains module names and params that are to be started when the agent starts initially
var PERSISTENTMODULES = [][]string {{"audit",`{"class":"parameters", "parameters":{"rulefilepath": "audit.rules.json", "outputsockets": ["/path/to/unix.sock"], "serveraddress": "mozdef_url"}}`}}
203 changes: 203 additions & 0 deletions mig-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ type moduleOp struct {
expireafter time.Time
}

//struct to be used when sending messages to a persistent module process
type persistentModuleMsg struct {
serial int
msg modules.Message
responseChan chan []byte
}

var runningPersistentMods = make(map[string]chan persistentModuleMsg)

var runningOps = make(map[float64]moduleOp)

func main() {
Expand Down Expand Up @@ -368,6 +377,24 @@ func runAgent(runOpt runtimeOptions) (err error) {
// service should simply be stopped.
exitReason = <-ctx.Channels.Terminate
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Shutting down agent: '%v'", exitReason)}.Emerg()
// send stop messags to existing persistent modules
if len(runningPersistentMods) > 0 {
for modName := range runningPersistentMods {
msg := modules.Message{Class: modules.MsgClassStop, Parameters: nil}
response := make(chan []byte)
requestMsg := persistentModuleMsg{msg: msg, responseChan: response}
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("sending stop signal to persistent module: %q", modName)}
runningPersistentMods[modName] <- requestMsg
select {
// wait for a specified time for persistent module process
case <-time.After(time.Second * 3):
break
case <-response:
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module: %q shut down", modName)}
}
}
}

time.Sleep(time.Second) // give a chance for work in progress to finish before we lock up

// lock publication forever, we're shutting down
Expand Down Expand Up @@ -457,6 +484,182 @@ func startRoutines(ctx *Context) (err error) {
ctx.Channels.Log <- mig.Log{Desc: "periodic environment refresh is disabled"}
}

// check for persistent modules, if enabled parse their config options
if len(PERSISTENTMODULES) > 0 {
for _, moduleString := range PERSISTENTMODULES {
moduleName := moduleString[0]
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Enabled persistent module %q", moduleName)}
//parse moduleparams from the config file
//module params are passed as JSON MsgClassParameters Message
moduleParams := moduleString[1]
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("Feeding message to persistent module %q", moduleParams)}
// feed params to peristent module
err = startPersistentModule(ctx, moduleName, moduleParams)
if err != nil {
return
}
}
}
return
}

// launch persistent module as a subprocess and open pipes to stdin and stdout
// for communicating with the process
// starts go routines that listens and feeds the subprocess
func startPersistentModule(ctx *Context, moduleName, params string) (err error) {
var isTimeOut = true
//buffered channel may be ? design decision
stdOutChannel := make(chan []byte)
supervisorChan := make(chan persistentModuleMsg)
cmd := exec.Command(ctx.Agent.BinPath, "-m", moduleName)
// stdin, stdout are used for communicating with the persistent module process
// the flow is full duplex
// open pipes for communicating with process stdin and stdout
stdinPipe, err := cmd.StdinPipe()
if err != nil {
return err
}
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return err
}
stdoutScanner := bufio.NewScanner(stdoutPipe)
//append the module to runningPersistentMods
runningPersistentMods[moduleName] = supervisorChan
// start a go routine that continously scans on process stdout
// and as soon as any data arrives, take appropriate action
// caveat: scanner will always split the input by newlines, unless we provide our own split function.
// so the persistent module should always end its message by \n
go func() {
for stdoutScanner.Scan() {
stdOutChannel <- stdoutScanner.Bytes()
}
}()

if err := cmd.Start(); err != nil {
return err
}

modParams := []byte(params)
modParams = append(modParams, []byte("\n")...)
//start a go-routine to feed params to persistent module process
go func() {
left := len(modParams)
for left > 0 {
nb, err := stdinPipe.Write(modParams)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%q: stdin Write failed", moduleName)}.Err()
stdinPipe.Close()
return
}
left -= nb
modParams = modParams[nb:]
}
}()

waiter := make(chan error, 1)
go func() {
waiter <- cmd.Wait()
}()
// go routine to continously process messages out of stdoutChannel, monitor process waiter and apply
// timeout mechanism, if the process doesn't send a hearbeat in 30 seconds, it kills the process
// or if process died or failed due to some error, sends signal to shutdown the agent
timeOutTicker := time.Tick(30 * time.Second)
go func() {
var openReqs = make(map[int]*chan []byte)
loop:
for {
select {
case err := <-waiter:
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q failed with %q", moduleName, err)}.Err()
ctx.Channels.Terminate <- fmt.Sprintf("persistent module %q failed", moduleName)
} else {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q exited.", moduleName)}
}
delete(runningPersistentMods, moduleName)
break loop
// all messages on stdout should be of MessageClass
// parse message, check for hearbeat messages, reset timeout counter
case msg := <-stdOutChannel:
infd := bufio.NewReader(bytes.NewBuffer(msg))
newMessage, err := modules.ReadInput(infd)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q : error while reading %q", moduleName, err)}.Err()
continue
}
// TODO: check mapping between the incoming response and the requests outstanding
// and signal them via their response channels
if newMessage.Class == modules.MsgClassHeartbeat {
//reset time counter
isTimeOut = false
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%q is alive", moduleName)}.Debug()

} else if newMessage.Class == modules.MsgClassParameters {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%q: New Config loaded.", moduleName)}
} else if newMessage.Class == modules.MsgClassLog {
var serial int
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q: %q", moduleName, msg)}
rawParams, err := json.Marshal(newMessage.Parameters)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q : error marshaling params %q", moduleName, err)}.Err()
continue
}
err = json.Unmarshal(rawParams, &serial)
if err != nil {
// ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q : error reading params %q", moduleName, err)}.Err()
continue
}
if responseChan, ok := openReqs[serial]; ok {
*responseChan <- []byte("1")
delete(openReqs, serial)
}
} else {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%q: %q", moduleName, msg)}
}
case <-timeOutTicker:
if isTimeOut {
//check status, kill process etc.
isTimeOut = true
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("killing persistent module %q", moduleName)}
// kill the module process
err := cmd.Process.Kill()
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q : %q", moduleName, err)}.Err()
}
<-waiter // allow go routine to exit
delete(runningPersistentMods, moduleName)
ctx.Channels.Terminate <- fmt.Sprintf("persistent module %q timed out", moduleName)
break loop // exit go routine
} else {
isTimeOut = true
}
case msgStruct := <-supervisorChan:
newMessage := msgStruct.msg
rawMsg, err := modules.MakeMessage(newMessage.Class, newMessage.Parameters, false)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("persistent module %q : %q", moduleName, err)}.Err()
continue
}
rawMsg = append(rawMsg, []byte("\n")...)
left := len(rawMsg)
for left > 0 {
nb, err := stdinPipe.Write(rawMsg)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%q: stdin Write failed", moduleName)}.Err()
stdinPipe.Close()
break
}
left -= nb
rawMsg = rawMsg[nb:]
}
// make a mapping between the incoming request & the response chan
// to later notify on the chan
openReqs[msgStruct.serial] = &msgStruct.responseChan
}
}
}()
ctx.Channels.Log <- mig.Log{Desc: fmt.Sprintf("%q: Launched", moduleName)}
return
}

Expand Down

0 comments on commit 25c6d6a

Please sign in to comment.