Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactoring to remove pubsub flags to improve experience #3339

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ ifeq ($(ENABLE_PUBSUB),true)
--set disabledBuiltins={http.send} \
--set logMutations=true \
--set audit.enablePubsub=${ENABLE_PUBSUB} \
--set audit.connection=${AUDIT_CONNECTION} \
--set audit.channel=${AUDIT_CHANNEL} \
--set-string auditPodAnnotations.dapr\\.io/enabled=true \
--set-string auditPodAnnotations.dapr\\.io/app-id=audit \
--set-string auditPodAnnotations.dapr\\.io/metrics-port=9999 \
Expand Down
2 changes: 0 additions & 2 deletions cmd/build/helmify/replacements.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ var replacements = map[string]string{

"- HELMSUBST_PUBSUB_ARGS": `{{ if .Values.audit.enablePubsub}}
- --enable-pub-sub={{ .Values.audit.enablePubsub }}
- --audit-connection={{ .Values.audit.connection }}
- --audit-channel={{ .Values.audit.channel }}
{{- end }}`,

"HELMSUBST_MUTATING_WEBHOOK_FAILURE_POLICY": `{{ .Values.mutatingWebhookFailurePolicy }}`,
Expand Down
4 changes: 1 addition & 3 deletions cmd/build/helmify/static/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,7 @@ controllerManager:
# - ipBlock:
# cidr: 0.0.0.0/0
audit:
enablePubsub: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you removed the wrong value here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yip! I will update that.

connection: audit-connection
channel: audit-channel
enablePubSub: false
hostNetwork: false
dnsPolicy: ClusterFirst
metricsPort: 8888
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ spec:
- --operation=status
{{ if .Values.audit.enablePubsub}}
- --enable-pub-sub={{ .Values.audit.enablePubsub }}
- --audit-connection={{ .Values.audit.connection }}
- --audit-channel={{ .Values.audit.channel }}
{{- end }}
{{ if not .Values.disableMutation}}- --operation=mutation-status{{- end }}
- --logtostderr
Expand Down
4 changes: 1 addition & 3 deletions manifest_staging/charts/gatekeeper/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,7 @@ controllerManager:
# - ipBlock:
# cidr: 0.0.0.0/0
audit:
enablePubsub: false
connection: audit-connection
channel: audit-channel
enablePubSub: false
hostNetwork: false
dnsPolicy: ClusterFirst
metricsPort: 8888
Expand Down
4 changes: 1 addition & 3 deletions pkg/audit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ var (
auditEventsInvolvedNamespace = flag.Bool("audit-events-involved-namespace", false, "emit audit events for each violation in the involved objects namespace, the default (false) generates events in the namespace Gatekeeper is installed in. Audit events from cluster-scoped resources will still follow the default behavior")
auditMatchKindOnly = flag.Bool("audit-match-kind-only", false, "only use kinds specified in all constraints for auditing cluster resources. if kind is not specified in any of the constraints, it will audit all resources (same as setting this flag to false)")
apiCacheDir = flag.String("api-cache-dir", defaultAPICacheDir, "The directory where audit from api server cache are stored, defaults to /tmp/audit")
auditConnection = flag.String("audit-connection", defaultConnection, "Connection name for publishing audit violation messages. Defaults to audit-connection")
auditChannel = flag.String("audit-channel", defaultChannel, "Channel name for publishing audit violation messages. Defaults to audit-channel")
emptyAuditResults = newLimitQueue(0)
logStatsAudit = flag.Bool("log-stats-audit", false, "(alpha) log stats metrics for the audit run")
)
Expand Down Expand Up @@ -901,7 +899,7 @@ func (am *Manager) addAuditResponsesToUpdateLists(
labels := r.obj.GetLabels()
logViolation(am.log, constraint, ea, gvk, namespace, name, msg, details, labels)
if *pubsubController.PubsubEnabled {
err := am.pubsubSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, gvk, namespace, name, msg, details, labels, timestamp))
err := am.pubsubSystem.Publish(context.Background(), violationMsg(constraint, ea, gvk, namespace, name, msg, details, labels, timestamp))
if err != nil {
am.log.Error(err, "pubsub audit Publishing")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pubsub/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// PubSub is the interface that wraps pubsub methods.
type Connection interface {
// Publish single message over a specific topic/channel
Publish(ctx context.Context, data interface{}, topic string) error
Publish(ctx context.Context, data interface{}) error

// Close connections
CloseConnection() error
Expand Down
20 changes: 18 additions & 2 deletions pkg/pubsub/dapr/dapr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
type ClientConfig struct {
// Name of the component to be used for pub sub messaging
Component string `json:"component"`

// Topic where the messages would be published for the connection
Topic string `json:"topic"`
}

// Dapr represents driver for interacting with pub sub using dapr.
Expand All @@ -21,19 +24,22 @@ type Dapr struct {

// Name of the pubsub component
pubSubComponent string

// Topic where the messages would be published for the connection
topic string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What part of the connection/channel config should be specify-able by downstream users (e.g. constraint authors) and what should be owned by the author of the constraint objects?

We should think about the personas who would be interacting with these knobs. Are the people setting up the infra always the same people writing the policy?

}

const (
Name = "dapr"
)

func (r *Dapr) Publish(_ context.Context, data interface{}, topic string) error {
func (r *Dapr) Publish(_ context.Context, data interface{}) error {
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

err = r.client.PublishEvent(context.Background(), r.pubSubComponent, topic, jsonData)
err = r.client.PublishEvent(context.Background(), r.pubSubComponent, r.topic, jsonData)
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}
Expand All @@ -56,6 +62,11 @@ func (r *Dapr) UpdateConnection(_ context.Context, config interface{}) error {
return fmt.Errorf("failed to get value of component")
}
r.pubSubComponent = cfg.Component
cfg.Topic, ok = m["topic"].(string)
if !ok {
return fmt.Errorf("failed to get value of topic")
}
r.topic = cfg.Topic
return nil
}

Expand All @@ -70,6 +81,10 @@ func NewConnection(_ context.Context, config interface{}) (connection.Connection
if !ok {
return nil, fmt.Errorf("failed to get value of component")
}
cfg.Topic, ok = m["topic"].(string)
if !ok {
return nil, fmt.Errorf("failed to get value of topic")
}

tmp, err := daprClient.NewClient()
if err != nil {
Expand All @@ -79,5 +94,6 @@ func NewConnection(_ context.Context, config interface{}) (connection.Connection
return &Dapr{
client: tmp,
pubSubComponent: cfg.Component,
topic: cfg.Topic,
}, nil
}
25 changes: 6 additions & 19 deletions pkg/pubsub/dapr/dapr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ func TestDapr_Publish(t *testing.T) {
ctx := context.Background()

type args struct {
ctx context.Context
data interface{}
topic string
ctx context.Context
data interface{}
}

tests := []struct {
Expand All @@ -72,35 +71,22 @@ func TestDapr_Publish(t *testing.T) {
data: map[string]interface{}{
"test": "test",
},
topic: "test",
},
wantErr: false,
},
{
name: "test publish without data",
args: args{
ctx: ctx,
data: nil,
topic: "test",
ctx: ctx,
data: nil,
},
wantErr: false,
},
{
name: "test publish without topic",
args: args{
ctx: ctx,
data: map[string]interface{}{
"test": "test",
},
topic: "",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := testClient
if err := r.Publish(tt.args.ctx, tt.args.data, tt.args.topic); (err != nil) != tt.wantErr {
if err := r.Publish(tt.args.ctx, tt.args.data); (err != nil) != tt.wantErr {
t.Errorf("Dapr.Publish() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand All @@ -117,6 +103,7 @@ func TestDapr_UpdateConnection(t *testing.T) {
name: "test update connection",
config: map[string]interface{}{
"component": "foo",
"topic": "bar",
},
wantErr: false,
},
Expand Down
11 changes: 10 additions & 1 deletion pkg/pubsub/dapr/fake_dapr_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func FakeConnection() (connection.Connection, func()) {
return &Dapr{
client: c,
pubSubComponent: "test",
topic: "test",
}, f
}

Expand All @@ -338,11 +339,14 @@ type FakeDapr struct {
// Name of the pubsub component
pubSubComponent string

// Name of the topic
topic string

// closing function
f func()
}

func (r *FakeDapr) Publish(_ context.Context, _ interface{}, _ string) error {
func (r *FakeDapr) Publish(_ context.Context, _ interface{}) error {
return nil
}

Expand Down Expand Up @@ -376,12 +380,17 @@ func FakeNewConnection(ctx context.Context, config interface{}) (connection.Conn
if !ok {
return nil, fmt.Errorf("failed to get value of component")
}
cfg.Topic, ok = m["topic"].(string)
if !ok {
return nil, fmt.Errorf("failed to get value of topic")
}

c, f := getTestClient(ctx)

return &FakeDapr{
client: c,
pubSubComponent: cfg.Component,
f: f,
topic: cfg.Topic,
}, nil
}
18 changes: 11 additions & 7 deletions pkg/pubsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"errors"
"fmt"
"sync"

Expand All @@ -19,16 +20,19 @@ func NewSystem() *System {
return &System{}
}

func (s *System) Publish(_ context.Context, connection string, topic string, msg interface{}) error {
func (s *System) Publish(ctx context.Context, msg interface{}) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still looking at supporting multiple queues simultaneously? Possibly specifying which connection/topic gets pushed to as part of a constraint's enforcement action?

Example: send violation X to security queue, violation Y to ops queue?

If so, Publish() will need to support something more fine-grained than "broadcast everywhere"

s.mux.RLock()
defer s.mux.RUnlock()
if len(s.connections) > 0 {
if c, ok := s.connections[connection]; ok {
return c.Publish(context.Background(), msg, topic)
}
return fmt.Errorf("connection is not initialized, name: %s ", connection)
var errs error

if len(s.connections) == 0 {
return fmt.Errorf("no connections are established")
}

for _, c := range s.connections {
errs = errors.Join(errs, c.Publish(ctx, msg))
}
return fmt.Errorf("No connections are established")
return errs
}

func (s *System) UpsertConnection(ctx context.Context, config interface{}, name string, provider string) error {
Expand Down
15 changes: 5 additions & 10 deletions pkg/pubsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestMain(m *testing.M) {
cfg := map[string]interface{}{
dapr.Name: map[string]interface{}{
"component": "pubsub",
"topic": "audit",
},
}
for name, fakeConn := range tmp {
Expand Down Expand Up @@ -90,6 +91,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
ctx: context.Background(),
config: map[string]interface{}{
"component": "pubsub",
"topic": "test",
},
name: "dapr",
provider: "dapr",
Expand All @@ -111,6 +113,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
ctx: context.Background(),
config: map[string]interface{}{
"component": "pubsub",
"topic": "test",
},
name: "audit",
provider: "test",
Expand All @@ -133,6 +136,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
ctx: context.Background(),
config: map[string]interface{}{
"component": "test",
"topic": "audit",
},
name: "audit",
provider: "dapr",
Expand Down Expand Up @@ -222,15 +226,6 @@ func TestSystem_Publish(t *testing.T) {
args: args{ctx: context.Background(), connection: "audit", topic: "test", msg: nil},
wantErr: true,
},
{
name: "Publishing to a connection that does not exist",
fields: fields{
connections: map[string]connection.Connection{"audit": &dapr.Dapr{}},
providers: map[string]string{"audit": "dapr"},
},
args: args{ctx: context.Background(), connection: "test", topic: "test", msg: nil},
wantErr: true,
},
{
name: "Publishing to a connection that does exist",
fields: fields{
Expand All @@ -248,7 +243,7 @@ func TestSystem_Publish(t *testing.T) {
connections: tt.fields.connections,
providers: tt.fields.providers,
}
if err := s.Publish(tt.args.ctx, tt.args.connection, tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr {
if err := s.Publish(tt.args.ctx, tt.args.msg); (err != nil) != tt.wantErr {
t.Errorf("System.Publish() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
3 changes: 2 additions & 1 deletion test/pubsub/publish-components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ data:
provider: "dapr"
config: |
{
"component": "pubsub"
"component": "pubsub",
"topic": "audit-channel"
}