Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/server/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
jobRunCompiler "github.com/odpf/optimus/compiler"
"github.com/odpf/optimus/config"
"github.com/odpf/optimus/datastore"
"github.com/odpf/optimus/ext/notify/pagerduty"
"github.com/odpf/optimus/ext/notify/slack"
"github.com/odpf/optimus/job"
"github.com/odpf/optimus/models"
Expand Down Expand Up @@ -278,6 +279,14 @@ func (s *OptimusServer) setupHandlers() error {
s.logger.Error("slack error accumulator", "error", err)
},
),
"pagerduty": pagerduty.NewNotifier(
notificationContext,
pagerduty.DefaultEventBatchInterval,
func(err error) {
s.logger.Error("pagerduty error accumulator", "error", err)
},
new(pagerduty.PagerDutyServiceImpl),
),
})

jobDeploymentRepository := postgres.NewJobDeploymentRepository(s.dbConn)
Expand Down
4 changes: 3 additions & 1 deletion docs/docs/guides/alerts.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ behavior:
duration : 2h45m
channels:
- slack://#slack-channel or @team-group or user&gmail.com
- pagerduty://#pagerduty_service_name
```

* sla_miss expects a duration config expect users to provide the duration as string.
* slack alerting is supported now which users can configure to channel or team handle or a specific user.
* slack alerting is supported now which users can configure to channel or team handle or a specific user.
* for pagerduty alerts to work, ```notify_<pagerduty_service_name>``` secret with pagerduty integration key/routing key needs to be registered with optimus.
5 changes: 4 additions & 1 deletion ext/notify/notify.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
package notify

import _ "github.com/odpf/optimus/ext/notify/slack"
import (
_ "github.com/odpf/optimus/ext/notify/pagerduty"
_ "github.com/odpf/optimus/ext/notify/slack"
)
129 changes: 129 additions & 0 deletions ext/notify/pagerduty/pagerdutynotifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package pagerduty

import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/odpf/optimus/models"
)

const (
DefaultEventBatchInterval = time.Second * 10
)

var (
pagerdutyQueueCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "notify_pagerduty_queue",
Help: "Items queued in pagerduty notification channel",
})
pagerdutyWorkerBatchCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "notify_pagerduty_worker_batch",
Help: "Worker execution count in pagerduty notification channel",
})
pagerdutyWorkerSendErrCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "notify_pagerduty_worker_send_err",
Help: "Failure of messages in pagerduty notification channel worker",
})
)

type Notifier struct {
io.Closer
msgQueue []Event
wg sync.WaitGroup
mu sync.Mutex
workerErrChan chan error
pdService PagerDutyService
eventBatchInterval time.Duration
}

type Event struct {
routingKey string
projectName string
namespaceName string
jobName string
owner string
meta models.JobEvent
}

func (s *Notifier) Notify(_ context.Context, attr models.NotifyAttrs) error {
routingKey, ok := attr.Namespace.ProjectSpec.Secret.GetByName(strings.ReplaceAll(attr.Route, "#", "notify_"))
if !ok {
return fmt.Errorf("failed to find authentication token of bot required for sending notifications, please register %s secret", strings.ReplaceAll(attr.Route, "#", "notify_"))
}
s.queueNotification(routingKey, attr)
return nil
}

func (s *Notifier) queueNotification(routingKey string, attr models.NotifyAttrs) {
s.mu.Lock()
defer s.mu.Unlock()
evt := Event{
routingKey: routingKey,
projectName: attr.Namespace.ProjectSpec.Name,
namespaceName: attr.Namespace.Name,
jobName: attr.JobSpec.Name,
owner: attr.JobSpec.Owner,
meta: attr.JobEvent,
}
s.msgQueue = append(s.msgQueue, evt)
pagerdutyQueueCounter.Inc()
}

func (s *Notifier) Worker(ctx context.Context) {
defer s.wg.Done()

for {
s.mu.Lock()
for _, evt := range s.msgQueue {
err := s.pdService.SendAlert(ctx, evt)
if err != nil {
s.workerErrChan <- fmt.Errorf("Worker_SendMessageContext: %w", err)
}
}
s.msgQueue = nil // empty the queue
s.mu.Unlock()

pagerdutyWorkerBatchCounter.Inc()
select {
case <-ctx.Done():
close(s.workerErrChan)
return
default:
time.Sleep(s.eventBatchInterval)
}
}
}

func (s *Notifier) Close() error { // nolint: unparam
// drain batches
s.wg.Wait()
return nil
}

func NewNotifier(ctx context.Context, eventBatchInterval time.Duration, errHandler func(error), pdService PagerDutyService) *Notifier {
notifier := &Notifier{
msgQueue: make([]Event, 0),
workerErrChan: make(chan error),
eventBatchInterval: eventBatchInterval,
pdService: pdService,
}

notifier.wg.Add(1)
go func() {
for err := range notifier.workerErrChan {
errHandler(err)
pagerdutyWorkerSendErrCounter.Inc()
}
notifier.wg.Done()
}()
notifier.wg.Add(1)
go notifier.Worker(ctx)
return notifier
}
140 changes: 140 additions & 0 deletions ext/notify/pagerduty/pagerdutynotifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package pagerduty

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/protobuf/types/known/structpb"

"github.com/odpf/optimus/models"
)

type PagerDutyServiceImplMock struct {
mock.Mock
}

func (s *PagerDutyServiceImplMock) SendAlert(ctx context.Context, evt Event) error {
err := s.Called(ctx, evt).Error(0)
return err
}

func TestPagerDuty(t *testing.T) {
t.Run("should send alert to pagerduty service using service name successfully", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var sendErrors []error

pagerDutyServiceMock := new(PagerDutyServiceImplMock)
pagerDutyServiceMock.On("SendAlert", ctx,
Event{
routingKey: "test-token",
projectName: "foo",
namespaceName: "test",
jobName: "foo-job-spec",
owner: "",
meta: models.JobEvent{
Type: "failure",
Value: map[string]*structpb.Value(nil),
},
},
).Return(nil)
defer pagerDutyServiceMock.AssertExpectations(t)

client := NewNotifier(
ctx,
time.Millisecond*500,
func(err error) {
sendErrors = append(sendErrors, err)
},
pagerDutyServiceMock,
)
defer client.Close()

err := client.Notify(context.Background(), models.NotifyAttrs{
Namespace: models.NamespaceSpec{
Name: "test",
ProjectSpec: models.ProjectSpec{
Name: "foo",
Secret: []models.ProjectSecretItem{
{
Name: "optimus@test.com",
Value: "test-token",
},
},
},
},
JobSpec: models.JobSpec{
Name: "foo-job-spec",
},
JobEvent: models.JobEvent{
Type: models.JobEventTypeFailure,
},
Route: "optimus@test.com",
})

assert.Nil(t, err)
cancel()
assert.Nil(t, client.Close())
assert.Nil(t, sendErrors)
})

t.Run("should call error handler function for any error from pagerduty service ", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var sendErrors []error

pagerDutyServiceMock := new(PagerDutyServiceImplMock)
pagerDutyServiceMock.On("SendAlert", ctx,
Event{
routingKey: "test-invalid-token",
projectName: "foo",
namespaceName: "test",
jobName: "foo-job-spec",
owner: "",
meta: models.JobEvent{
Type: "failure",
Value: map[string]*structpb.Value(nil),
},
},
).Return(fmt.Errorf("invalid routing key test-invalid-token"))
defer pagerDutyServiceMock.AssertExpectations(t)

client := NewNotifier(
ctx,
time.Millisecond*500,
func(err error) {
sendErrors = append(sendErrors, err)
},
pagerDutyServiceMock,
)
defer client.Close()

client.Notify(context.Background(), models.NotifyAttrs{
Namespace: models.NamespaceSpec{
Name: "test",
ProjectSpec: models.ProjectSpec{
Name: "foo",
Secret: []models.ProjectSecretItem{
{
Name: "optimus@test.com",
Value: "test-invalid-token",
},
},
},
},
JobSpec: models.JobSpec{
Name: "foo-job-spec",
},
JobEvent: models.JobEvent{
Type: models.JobEventTypeFailure,
},
Route: "optimus@test.com",
})

cancel()
assert.Nil(t, client.Close())
assert.NotNil(t, sendErrors)
})
}
73 changes: 73 additions & 0 deletions ext/notify/pagerduty/pagerdutyservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package pagerduty

import (
"context"
"encoding/json"

"github.com/PagerDuty/go-pagerduty"
)

type PagerDutyService interface {
SendAlert(context.Context, Event) error
}

type PagerDutyServiceImpl struct {
}

type customDetails struct {
Owner string `json:"owner"`
Namespace string `json:"namespace"`
LogURL string `json:"log_url"`
JobURL string `json:"job_url"`
Exception string `json:"exception"`
Message string `json:"message"`
}

func buildPayloadCustomDetails(evt Event) (string, error) {
details := &customDetails{Owner: evt.owner, Namespace: evt.namespaceName}
if logURL, ok := evt.meta.Value["log_url"]; ok && logURL.GetStringValue() != "" {
details.LogURL = logURL.GetStringValue()
}
if jobURL, ok := evt.meta.Value["job_url"]; ok && jobURL.GetStringValue() != "" {
details.JobURL = jobURL.GetStringValue()
}
if exception, ok := evt.meta.Value["exception"]; ok && exception.GetStringValue() != "" {
details.Exception = exception.GetStringValue()
}
if message, ok := evt.meta.Value["message"]; ok && message.GetStringValue() != "" {
details.Message = message.GetStringValue()
}

det, err := json.Marshal(&details)
if err != nil {
return "", err
}
return string(det), nil
}

func (*PagerDutyServiceImpl) SendAlert(ctx context.Context, evt Event) error {
details, err := buildPayloadCustomDetails(evt)
if err != nil {
return err
}

payload := pagerduty.V2Payload{
Summary: "Optimus " + string(evt.meta.Type) + " " + evt.jobName,
Severity: "critical",
Source: evt.projectName,
Details: details,
}

e := pagerduty.V2Event{
RoutingKey: evt.routingKey,
Action: "trigger",
Payload: &payload,
}
_, err = pagerduty.ManageEventWithContext(ctx, e)

if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/AlecAivazis/survey/v2 v2.2.7
github.com/MakeNowJust/heredoc v1.0.0
github.com/Masterminds/sprig/v3 v3.2.2
github.com/PagerDuty/go-pagerduty v1.5.1
github.com/briandowns/spinner v1.18.0
github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0
github.com/emirpasic/gods v1.12.0
Expand Down
Loading