Skip to content

Commit

Permalink
Auto-recover policy report (#1730)
Browse files Browse the repository at this point in the history
* auto-recover policy report

Signed-off-by: Shuting Zhao <shutting06@gmail.com>

* add flag background-scan to tune this interval

Signed-off-by: Shuting Zhao <shutting06@gmail.com>

* cleanup webhook configurations when Kyverno deployment is deleted

Signed-off-by: Shuting Zhao <shutting06@gmail.com>

* reconcile policy reports if Kyverno Configmap changes

Signed-off-by: Shuting Zhao <shutting06@gmail.com>
  • Loading branch information
realshuting committed Mar 25, 2021
1 parent 4d70013 commit fd9acf2
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 68 deletions.
66 changes: 34 additions & 32 deletions cmd/kyverno/main.go
Expand Up @@ -41,35 +41,36 @@ const resyncPeriod = 15 * time.Minute
var (
//TODO: this has been added to backward support command line arguments
// will be removed in future and the configuration will be set only via configmaps
filterK8sResources string
kubeconfig string
serverIP string
runValidationInMutatingWebhook string
excludeGroupRole string
excludeUsername string
profilePort string
filterK8sResources string
kubeconfig string
serverIP string
excludeGroupRole string
excludeUsername string
profilePort string

webhookTimeout int
genWorkers int

profile bool
policyReport bool
setupLog = log.Log.WithName("setup")

policyControllerResyncPeriod time.Duration
setupLog = log.Log.WithName("setup")
)

func main() {
klog.InitFlags(nil)
log.SetLogger(klogr.New())
flag.StringVar(&filterK8sResources, "filterK8sResources", "", "k8 resource in format [kind,namespace,name] where policy is not evaluated by the admission webhook. example --filterKind \"[Deployment, kyverno, kyverno]\" --filterKind \"[Deployment, kyverno, kyverno],[Events, *, *]\"")
flag.StringVar(&filterK8sResources, "filterK8sResources", "", "Resource in format [kind,namespace,name] where policy is not evaluated by the admission webhook. For example, --filterK8sResources \"[Deployment, kyverno, kyverno],[Events, *, *]\"")
flag.StringVar(&excludeGroupRole, "excludeGroupRole", "", "")
flag.StringVar(&excludeUsername, "excludeUsername", "", "")
flag.IntVar(&webhookTimeout, "webhooktimeout", 3, "timeout for webhook configurations")
flag.IntVar(&genWorkers, "gen-workers", 20, "workers for generate controller")
flag.IntVar(&webhookTimeout, "webhooktimeout", 3, "Timeout for webhook configurations")
flag.IntVar(&genWorkers, "gen-workers", 10, "Workers for generate controller")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
flag.StringVar(&runValidationInMutatingWebhook, "runValidationInMutatingWebhook", "", "Validation will also be done using the mutation webhook, set to 'true' to enable. Older kubernetes versions do not work properly when a validation webhook is registered.")
flag.BoolVar(&profile, "profile", false, "Set this flag to 'true', to enable profiling.")
flag.StringVar(&profilePort, "profile-port", "6060", "Enable profiling at given port, default to 6060.")
flag.DurationVar(&policyControllerResyncPeriod, "background-scan", time.Hour, "Perform background scan every given interval, e.g., 30s, 15m, 1h.")
if err := flag.Set("v", "2"); err != nil {
setupLog.Error(err, "failed to set log level")
os.Exit(1)
Expand Down Expand Up @@ -155,20 +156,7 @@ func main() {
// - ClusterPolicyReport, PolicyReport
// - GenerateRequest
// - ClusterReportChangeRequest, ReportChangeRequest
pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(pclient, resyncPeriod)

// Configuration Data
// dynamically load the configuration from configMap
// - resource filters
// if the configMap is update, the configuration will be updated :D
configData := config.NewConfigData(
kubeClient,
kubeInformer.Core().V1().ConfigMaps(),
filterK8sResources,
excludeGroupRole,
excludeUsername,
log.Log.WithName("ConfigData"),
)
pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(pclient, policyControllerResyncPeriod)

// EVENT GENERATOR
// - generate event with retry mechanism
Expand All @@ -185,10 +173,7 @@ func main() {
pInformer.Kyverno().V1().Policies().Lister())

// POLICY Report GENERATOR
// -- generate policy report
var reportReqGen *policyreport.Generator
var prgen *policyreport.ReportGenerator
reportReqGen = policyreport.NewReportChangeRequestGenerator(pclient,
reportReqGen := policyreport.NewReportChangeRequestGenerator(pclient,
client,
pInformer.Kyverno().V1alpha1().ReportChangeRequests(),
pInformer.Kyverno().V1alpha1().ClusterReportChangeRequests(),
Expand All @@ -198,7 +183,8 @@ func main() {
log.Log.WithName("ReportChangeRequestGenerator"),
)

prgen = policyreport.NewReportGenerator(client,
prgen := policyreport.NewReportGenerator(pclient,
client,
pInformer.Wgpolicyk8s().V1alpha1().ClusterPolicyReports(),
pInformer.Wgpolicyk8s().V1alpha1().PolicyReports(),
pInformer.Kyverno().V1alpha1().ReportChangeRequests(),
Expand All @@ -207,6 +193,20 @@ func main() {
log.Log.WithName("PolicyReportGenerator"),
)

// Configuration Data
// dynamically load the configuration from configMap
// - resource filters
// if the configMap is update, the configuration will be updated :D
configData := config.NewConfigData(
kubeClient,
kubeInformer.Core().V1().ConfigMaps(),
filterK8sResources,
excludeGroupRole,
excludeUsername,
prgen.ReconcileCh,
log.Log.WithName("ConfigData"),
)

// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
Expand All @@ -219,9 +219,11 @@ func main() {
configData,
eventGenerator,
reportReqGen,
prgen,
kubeInformer.Core().V1().Namespaces(),
log.Log.WithName("PolicyController"),
rCache,
policyControllerResyncPeriod,
)

if err != nil {
Expand Down Expand Up @@ -358,7 +360,7 @@ func main() {
go reportReqGen.Run(2, stopCh)
go prgen.Run(1, stopCh)
go configData.Run(stopCh)
go policyCtrl.Run(2, stopCh)
go policyCtrl.Run(2, prgen.ReconcileCh, stopCh)
go eventGenerator.Run(3, stopCh)
go grgen.Run(10, stopCh)
go grc.Run(genWorkers, stopCh)
Expand Down
33 changes: 19 additions & 14 deletions pkg/config/dynamicconfig.go
Expand Up @@ -31,6 +31,7 @@ type ConfigData struct {
excludeUsername []string
restrictDevelopmentUsername []string
cmSycned cache.InformerSynced
reconcilePolicyReport chan<- bool
log logr.Logger
}

Expand Down Expand Up @@ -96,17 +97,18 @@ type Interface interface {
}

// NewConfigData ...
func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapInformer, filterK8sResources, excludeGroupRole, excludeUsername string, log logr.Logger) *ConfigData {
func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapInformer, filterK8sResources, excludeGroupRole, excludeUsername string, reconcilePolicyReport chan<- bool, log logr.Logger) *ConfigData {
// environment var is read at start only
if cmNameEnv == "" {
log.Info("ConfigMap name not defined in env:INIT_CONFIG: loading no default configuration")
}

cd := ConfigData{
client: rclient,
cmName: os.Getenv(cmNameEnv),
cmSycned: cmInformer.Informer().HasSynced,
log: log,
client: rclient,
cmName: os.Getenv(cmNameEnv),
cmSycned: cmInformer.Informer().HasSynced,
reconcilePolicyReport: reconcilePolicyReport,
log: log,
}

cd.restrictDevelopmentUsername = []string{"minikube-user", "kubernetes-admin"}
Expand Down Expand Up @@ -163,7 +165,11 @@ func (cd *ConfigData) updateCM(old, cur interface{}) {
return
}
// if data has not changed then dont load configmap
cd.load(*cm)
changed := cd.load(*cm)
if changed {
cd.log.Info("resource filters changed, sending reconcile signal to the policy controller")
cd.reconcilePolicyReport <- true
}
}

func (cd *ConfigData) deleteCM(obj interface{}) {
Expand All @@ -189,16 +195,16 @@ func (cd *ConfigData) deleteCM(obj interface{}) {
cd.unload(*cm)
}

func (cd *ConfigData) load(cm v1.ConfigMap) {
func (cd *ConfigData) load(cm v1.ConfigMap) (changed bool) {
logger := cd.log.WithValues("name", cm.Name, "namespace", cm.Namespace)
if cm.Data == nil {
logger.V(4).Info("configuration: No data defined in ConfigMap")
return
}
// parse and load the configuration

cd.mux.Lock()
defer cd.mux.Unlock()
// get resource filters

filters, ok := cm.Data["resourceFilters"]
if !ok {
logger.V(4).Info("configuration: No resourceFilters defined in ConfigMap")
Expand All @@ -208,12 +214,11 @@ func (cd *ConfigData) load(cm v1.ConfigMap) {
logger.V(4).Info("resourceFilters did not change")
} else {
logger.V(2).Info("Updated resource filters", "oldFilters", cd.filters, "newFilters", newFilters)
// update filters
cd.filters = newFilters
changed = true
}
}

// get resource filters
excludeGroupRole, ok := cm.Data["excludeGroupRole"]
if !ok {
logger.V(4).Info("configuration: No excludeGroupRole defined in ConfigMap")
Expand All @@ -224,11 +229,10 @@ func (cd *ConfigData) load(cm v1.ConfigMap) {
logger.V(4).Info("excludeGroupRole did not change")
} else {
logger.V(2).Info("Updated resource excludeGroupRoles", "oldExcludeGroupRole", cd.excludeGroupRole, "newExcludeGroupRole", newExcludeGroupRoles)
// update filters
cd.excludeGroupRole = newExcludeGroupRoles
changed = true
}

// get resource filters
excludeUsername, ok := cm.Data["excludeUsername"]
if !ok {
logger.V(4).Info("configuration: No excludeUsername defined in ConfigMap")
Expand All @@ -238,11 +242,12 @@ func (cd *ConfigData) load(cm v1.ConfigMap) {
logger.V(4).Info("excludeGroupRole did not change")
} else {
logger.V(2).Info("Updated resource excludeUsernames", "oldExcludeUsername", cd.excludeUsername, "newExcludeUsername", excludeUsernames)
// update filters
cd.excludeUsername = excludeUsernames
changed = true
}
}

return changed
}

//TODO: this has been added to backward support command line arguments
Expand Down
109 changes: 109 additions & 0 deletions pkg/policy/report.go
@@ -1,12 +1,20 @@
package policy

import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
v1alpha1 "github.com/kyverno/kyverno/pkg/api/policyreport/v1alpha1"
kyvernoclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
policyreportlister "github.com/kyverno/kyverno/pkg/client/listers/policyreport/v1alpha1"
"github.com/kyverno/kyverno/pkg/engine/response"
"github.com/kyverno/kyverno/pkg/event"
"github.com/kyverno/kyverno/pkg/policyreport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

func (pc *PolicyController) report(policy string, engineResponses []*response.EngineResponse, logger logr.Logger) {
Expand All @@ -22,6 +30,107 @@ func (pc *PolicyController) report(policy string, engineResponses []*response.En
logger.V(4).Info("added a request to RCR generator", "key", info.ToKey())
}

// forceReconciliation forces a background scan by adding all policies to the workqueue
func (pc *PolicyController) forceReconciliation(reconcileCh <-chan bool, stopCh <-chan struct{}) {
logger := pc.log.WithName("forceReconciliation")
ticker := time.NewTicker(pc.reconcilePeriod)

for {
select {
case <-ticker.C:
logger.Info("performing the background scan", "scan interval", pc.reconcilePeriod.String())
if err := pc.policyReportEraser.EraseResultsEntries(eraseResultsEntries); err != nil {
logger.Error(err, "continue reconciling policy reports")
}

pc.requeuePolicies()

case erase := <-reconcileCh:
logger.Info("received the reconcile signal, reconciling policy report")
if erase {
if err := pc.policyReportEraser.EraseResultsEntries(eraseResultsEntries); err != nil {
logger.Error(err, "continue reconciling policy reports")
}
}

pc.requeuePolicies()

case <-stopCh:
return
}
}
}

func eraseResultsEntries(pclient *kyvernoclient.Clientset, reportLister policyreportlister.PolicyReportLister, clusterReportLister policyreportlister.ClusterPolicyReportLister) error {
var errors []string

if polrs, err := reportLister.List(labels.Everything()); err != nil {
errors = append(errors, err.Error())
} else {
for _, polr := range polrs {
polr.Results = []*v1alpha1.PolicyReportResult{}
polr.Summary = v1alpha1.PolicyReportSummary{}
if _, err = pclient.Wgpolicyk8sV1alpha1().PolicyReports(polr.GetNamespace()).Update(context.TODO(), polr, metav1.UpdateOptions{}); err != nil {
errors = append(errors, fmt.Sprintf("%s/%s/%s: %v", polr.Kind, polr.Namespace, polr.Name, err))
}
}
}

if cpolrs, err := clusterReportLister.List(labels.Everything()); err != nil {
errors = append(errors, err.Error())
} else {
for _, cpolr := range cpolrs {
cpolr.Results = []*v1alpha1.PolicyReportResult{}
cpolr.Summary = v1alpha1.PolicyReportSummary{}
if _, err = pclient.Wgpolicyk8sV1alpha1().ClusterPolicyReports().Update(context.TODO(), cpolr, metav1.UpdateOptions{}); err != nil {
errors = append(errors, fmt.Sprintf("%s/%s: %v", cpolr.Kind, cpolr.Name, err))
}
}
}

if len(errors) == 0 {
return nil
}

return fmt.Errorf("failed to erase results entries %v", strings.Join(errors, ";"))
}

func (pc *PolicyController) requeuePolicies() {
logger := pc.log.WithName("requeuePolicies")
if cpols, err := pc.pLister.List(labels.Everything()); err == nil {
for _, cpol := range cpols {
if !pc.canBackgroundProcess(cpol) {
continue
}
pc.enqueuePolicy(cpol)
}
} else {
logger.Error(err, "unable to list ClusterPolicies")
}

namespaces, err := pc.nsLister.List(labels.Everything())
if err != nil {
logger.Error(err, "unable to list namespaces")
return
}

for _, ns := range namespaces {
pols, err := pc.npLister.Policies(ns.GetName()).List(labels.Everything())
if err != nil {
logger.Error(err, "unable to list Policies", "namespace", ns.GetName())
continue
}

for _, p := range pols {
pol := ConvertPolicyToClusterPolicy(p)
if !pc.canBackgroundProcess(pol) {
continue
}
pc.enqueuePolicy(pol)
}
}
}

func generateEvents(log logr.Logger, ers []*response.EngineResponse) []event.Info {
var eventInfos []event.Info
for _, er := range ers {
Expand Down

0 comments on commit fd9acf2

Please sign in to comment.