Skip to content

Commit

Permalink
Merge pull request #30 from kpetremann/choose_ipc_file
Browse files Browse the repository at this point in the history
feat: choose ipc file
  • Loading branch information
kpetremann committed Jun 16, 2023
2 parents 700152b + afb3632 commit a384854
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 7 deletions.
3 changes: 3 additions & 0 deletions internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"github.com/rs/zerolog/log"
)

// ConfigureLogging configures the logger
//
// logLevel: The log level to use, in zerolog format
func ConfigureLogging(logLevel string) {
level, err := zerolog.ParseLevel(logLevel)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions internal/metrics/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package metrics

type MetricsConfig struct {
HealthMinions bool
// HealtMinions enable/disable the health functions/states metrics
HealthMinions bool

// HealthFunctionsFilter permits to limit the number of function exposed
HealthFunctionsFilters []string
HealthStatesFilters []string

// HealthFunctionsFilter permits to limit the number of state exposed
HealthStatesFilters []string
}
9 changes: 9 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type SaltEvent struct {
RawBody []byte
}

// RawToJSON converts raw body to JSON
//
// If indent is true, the JSON will be indented
func (e SaltEvent) RawToJSON(indent bool) ([]byte, error) {
if e.RawBody == nil {
return nil, errors.New("raw body not registered")
Expand All @@ -56,6 +59,7 @@ func (e SaltEvent) RawToJSON(indent bool) ([]byte, error) {
}
}

// RawToYAML converts raw body to YAML
func (e SaltEvent) RawToYAML() ([]byte, error) {
if e.RawBody == nil {
return nil, errors.New("raw body not registered")
Expand All @@ -69,6 +73,7 @@ func (e SaltEvent) RawToYAML() ([]byte, error) {
return yaml.Marshal(data)
}

// extractStateFromArgs extracts embedded state info
func extractStateFromArgs(args interface{}, key string) string {
// args only
if v, ok := args.(string); ok {
Expand Down Expand Up @@ -111,6 +116,10 @@ func (e *SaltEvent) ExtractState() string {
return ""
}

// ParseEvent parses a salt event
//
// Once parsed, the message is sent to the eventChan channel.
// KeepRawBody is used to keep the raw body of the event.
func ParseEvent(message map[string]interface{}, eventChan chan<- SaltEvent, keepRawBody bool) {
body := string(message["body"].([]byte))
lines := strings.SplitN(body, "\n\n", 2)
Expand Down
42 changes: 37 additions & 5 deletions pkg/events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,27 @@ import (
"github.com/vmihailenco/msgpack/v5"
)

const defaultIPCFilepath = "/var/run/salt/master/master_event_pub.ipc"

// EventListener listens to the salt-master event bus and sends events to the event channel
type EventListener struct {
ctx context.Context
eventChan chan SaltEvent
// ctx specificies the context used mainly for cancellation
ctx context.Context

// eventChan is the channel to send events to
eventChan chan SaltEvent

// iPCFilepath is filepath to the salt-master event bus
iPCFilepath string

// saltEventBus keeps the connection to the salt-master event bus
saltEventBus net.Conn
decoder *msgpack.Decoder

// decoder is msgpack decoder for parsing the event bus messages
decoder *msgpack.Decoder
}

// Open opens the salt-master event bus
func (e *EventListener) Open() {
log.Info().Msg("connecting to salt-master event bus")
var err error
Expand All @@ -28,7 +42,7 @@ func (e *EventListener) Open() {
default:
}

e.saltEventBus, err = net.Dial("unix", "/var/run/salt/master/master_event_pub.ipc")
e.saltEventBus, err = net.Dial("unix", e.iPCFilepath)
if err != nil {
log.Error().Msg("failed to connect to event bus, retrying in 5 seconds")
time.Sleep(time.Second * 5)
Expand All @@ -40,6 +54,7 @@ func (e *EventListener) Open() {
}
}

// Close closes the salt-master event bus
func (e *EventListener) Close() error {
log.Info().Msg("disconnecting from salt-master event bus")
if e.saltEventBus != nil {
Expand All @@ -49,6 +64,7 @@ func (e *EventListener) Close() error {
}
}

// Reconnect reconnects to the salt-master event bus
func (e *EventListener) Reconnect() {
select {
case <-e.ctx.Done():
Expand All @@ -59,11 +75,27 @@ func (e *EventListener) Reconnect() {
}
}

// NewEventListener creates a new EventListener
//
// The events will be sent to eventChan.
func NewEventListener(ctx context.Context, eventChan chan SaltEvent) *EventListener {
e := EventListener{ctx: ctx, eventChan: eventChan}
e := EventListener{ctx: ctx, eventChan: eventChan, iPCFilepath: defaultIPCFilepath}
return &e
}

// SetIPCFilepath sets the filepath to the salt-master event bus
//
// The IPC file must be readable by the user running the exporter.
//
// Default: /var/run/salt/master/master_event_pub.ipc
func (e *EventListener) SetIPCFilepath(filepath string) {
e.iPCFilepath = filepath
}

// ListenEvents listens to the salt-master event bus and sends events to the event channel
//
// if keepRawBody is true, the raw event body will be kept in the event struct.
// It can be useful for debugging or post-processing.
func (e *EventListener) ListenEvents(keepRawBody bool) {
e.Open()

Expand Down

0 comments on commit a384854

Please sign in to comment.