Skip to content

Commit

Permalink
Merge pull request #931 from petr-muller/cherry-pick-920-to-release-4.12
Browse files Browse the repository at this point in the history
OCPBUGS-12182: Update dnsPolicy to allow consistent resolution of the internal LB
  • Loading branch information
openshift-merge-robot committed May 4, 2023
2 parents a5c4031 + f9659cc commit f2620f6
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 68 deletions.
3 changes: 0 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/klog/v2"

_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
)

var (
Expand Down
4 changes: 3 additions & 1 deletion install/0000_00_cluster-version-operator_03_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ spec:
fieldPath: spec.nodeName
- name: CLUSTER_PROFILE
value: {{ .ClusterProfile }}
dnsPolicy: ClusterFirstWithHostNet
# this pod is hostNetwork and uses the internal LB DNS name when possible, which the kubelet also uses.
# this dnsPolicy allows us to use the same dnsConfig as the kubelet, without access to read it ourselves.
dnsPolicy: Default
hostNetwork: true
nodeSelector:
node-role.kubernetes.io/master: ""
Expand Down
15 changes: 10 additions & 5 deletions pkg/cincinnati/cincinnati.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,18 @@ const (
// Client is a Cincinnati client which can be used to fetch update graphs from
// an upstream Cincinnati stack.
type Client struct {
id uuid.UUID
transport *http.Transport
id uuid.UUID
transport *http.Transport
conditionRegistry clusterconditions.ConditionRegistry
}

// NewClient creates a new Cincinnati client with the given client identifier.
func NewClient(id uuid.UUID, transport *http.Transport) Client {
return Client{id: id, transport: transport}
func NewClient(id uuid.UUID, transport *http.Transport, conditionRegistry clusterconditions.ConditionRegistry) Client {
return Client{
id: id,
transport: transport,
conditionRegistry: conditionRegistry,
}
}

// Error is returned when are unable to get updates.
Expand Down Expand Up @@ -216,7 +221,7 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, chann

for i := len(conditionalUpdates) - 1; i >= 0; i-- {
for j, risk := range conditionalUpdates[i].Risks {
conditionalUpdates[i].Risks[j].MatchingRules, err = clusterconditions.PruneInvalid(ctx, risk.MatchingRules)
conditionalUpdates[i].Risks[j].MatchingRules, err = c.conditionRegistry.PruneInvalid(ctx, risk.MatchingRules)
if len(conditionalUpdates[i].Risks[j].MatchingRules) == 0 {
klog.Warningf("Conditional update to %s, risk %q, has empty pruned matchingRules; dropping this target to avoid rejections when pushing to the Kubernetes API server. Pruning results: %s", conditionalUpdates[i].Release.Version, risk.Name, err)
conditionalUpdates = append(conditionalUpdates[:i], conditionalUpdates[i+1:]...)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cincinnati/cincinnati_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"reflect"
"testing"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"

"github.com/blang/semver/v4"
"github.com/google/uuid"
configv1 "github.com/openshift/api/config/v1"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
_ "k8s.io/klog/v2" // integration tests set glog flags.
)

Expand Down Expand Up @@ -604,7 +604,7 @@ func TestGetUpdates(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(handler))
defer ts.Close()

c := NewClient(clientID, nil)
c := NewClient(clientID, nil, standard.NewConditionRegistry(nil))

uri, err := url.Parse(ts.URL)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions pkg/clusterconditions/always/always.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ import (
"errors"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
)

// Always implements a cluster condition that always matches.
type Always struct{}

var always = &Always{}

// Valid returns an error if the condition contains any properties
// besides 'type'.
func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition) error {
Expand All @@ -30,7 +27,3 @@ func (a *Always) Valid(ctx context.Context, condition *configv1.ClusterCondition
func (a *Always) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) {
return true, nil
}

func init() {
clusterconditions.Register("Always", always)
}
45 changes: 34 additions & 11 deletions pkg/clusterconditions/clusterconditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,51 @@ type Condition interface {
Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error)
}

// Registry is a registry of implemented condition types.
var Registry map[string]Condition
type ConditionRegistry interface {
// Register registers a condition type, and panics on any name collisions.
Register(conditionType string, condition Condition)

// PruneInvalid returns a new slice with recognized, valid conditions.
// The error complains about any unrecognized or invalid conditions.
PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error)

// Match returns whether the cluster matches the given rules (true),
// does not match (false), or the rules fail to evaluate (error).
Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error)
}

type conditionRegistry struct {
// registry is a registry of implemented condition types.
registry map[string]Condition
}

func NewConditionRegistry() ConditionRegistry {
ret := &conditionRegistry{
registry: map[string]Condition{},
}

return ret
}

// Register registers a condition type, and panics on any name collisions.
func Register(conditionType string, condition Condition) {
if Registry == nil {
Registry = make(map[string]Condition, 1)
func (r *conditionRegistry) Register(conditionType string, condition Condition) {
if r.registry == nil {
r.registry = make(map[string]Condition, 1)
}
if existing, ok := Registry[conditionType]; ok && condition != existing {
if existing, ok := r.registry[conditionType]; ok && condition != existing {
panic(fmt.Sprintf("cluster condition %q already registered", conditionType))
}
Registry[conditionType] = condition
r.registry[conditionType] = condition
}

// PruneInvalid returns a new slice with recognized, valid conditions.
// The error complains about any unrecognized or invalid conditions.
func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) {
func (r *conditionRegistry) PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition) ([]configv1.ClusterCondition, error) {
var valid []configv1.ClusterCondition
var errs []error

for _, config := range matchingRules {
condition, ok := Registry[config.Type]
condition, ok := r.registry[config.Type]
if !ok {
errs = append(errs, fmt.Errorf("Skipping unrecognized cluster condition type %q", config.Type))
continue
Expand All @@ -63,11 +86,11 @@ func PruneInvalid(ctx context.Context, matchingRules []configv1.ClusterCondition

// Match returns whether the cluster matches the given rules (true),
// does not match (false), or the rules fail to evaluate (error).
func Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) {
func (r *conditionRegistry) Match(ctx context.Context, matchingRules []configv1.ClusterCondition) (bool, error) {
var errs []error

for _, config := range matchingRules {
condition, ok := Registry[config.Type]
condition, ok := r.registry[config.Type]
if !ok {
klog.V(2).Infof("Skipping unrecognized cluster condition type %q", config.Type)
continue
Expand Down
15 changes: 6 additions & 9 deletions pkg/clusterconditions/clusterconditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import (
"testing"

configv1 "github.com/openshift/api/config/v1"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
_ "github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/standard"
)

// Error implements a cluster condition that always errors.
Expand All @@ -33,6 +30,7 @@ func (e *Error) Match(ctx context.Context, condition *configv1.ClusterCondition)

func TestPruneInvalid(t *testing.T) {
ctx := context.Background()
registry := standard.NewConditionRegistry(nil)

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -100,7 +98,7 @@ func TestPruneInvalid(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
valid, err := clusterconditions.PruneInvalid(ctx, testCase.conditions)
valid, err := registry.PruneInvalid(ctx, testCase.conditions)
if !reflect.DeepEqual(valid, testCase.expectedValid) {
t.Errorf("got valid %v but expected %v", valid, testCase.expectedValid)
}
Expand All @@ -117,7 +115,8 @@ func TestPruneInvalid(t *testing.T) {

func TestMatch(t *testing.T) {
ctx := context.Background()
clusterconditions.Register("Error", &Error{})
registry := standard.NewConditionRegistry(nil)
registry.Register("Error", &Error{})

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -181,7 +180,7 @@ func TestMatch(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
match, err := clusterconditions.Match(ctx, testCase.conditions)
match, err := registry.Match(ctx, testCase.conditions)
if match != testCase.expectedMatch {
t.Errorf("got match %t but expected %t", match, testCase.expectedMatch)
}
Expand All @@ -194,6 +193,4 @@ func TestMatch(t *testing.T) {
}
})
}

delete(clusterconditions.Registry, "Error")
}
66 changes: 43 additions & 23 deletions pkg/clusterconditions/promql/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@ import (
"context"
"errors"
"fmt"
"net"
"time"

configv1 "github.com/openshift/api/config/v1"
"github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/cache"
)

// PromQL implements a cluster condition that matches based on PromQL.
type PromQL struct {
// Address holds the Prometheus query URI.
Address string
kubeClient kubernetes.Interface

// HTTPClientConfig holds the client configuration for connecting to the Prometheus service.
HTTPClientConfig config.HTTPClientConfig
Expand All @@ -32,23 +33,40 @@ type PromQL struct {
QueryTimeout time.Duration
}

var promql = &cache.Cache{
Condition: &PromQL{
Address: "https://thanos-querier.openshift-monitoring.svc.cluster.local:9091",
HTTPClientConfig: config.HTTPClientConfig{
Authorization: &config.Authorization{
Type: "Bearer",
CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token",
},
TLSConfig: config.TLSConfig{
CAFile: "/etc/tls/service-ca/service-ca.crt",
func NewPromQL(kubeClient kubernetes.Interface) *cache.Cache {
return &cache.Cache{
Condition: &PromQL{
kubeClient: kubeClient,
HTTPClientConfig: config.HTTPClientConfig{
Authorization: &config.Authorization{
Type: "Bearer",
CredentialsFile: "/var/run/secrets/kubernetes.io/serviceaccount/token",
},
TLSConfig: config.TLSConfig{
CAFile: "/etc/tls/service-ca/service-ca.crt",
// ServerName is used to verify the name of the service we will connect to using IP.
ServerName: "thanos-querier.openshift-monitoring.svc.cluster.local",
},
},
QueryTimeout: 5 * time.Minute,
},
QueryTimeout: 5 * time.Minute,
},
MinBetweenMatches: 10 * time.Minute,
MinForCondition: time.Hour,
Expiration: 24 * time.Hour,
MinBetweenMatches: 10 * time.Minute,
MinForCondition: time.Hour,
Expiration: 24 * time.Hour,
}
}

// Address determines the address of the thanos-querier to avoid requiring service DNS resolution.
// We do this so that our host-network pod can use the node's resolv.conf to resolve the internal load balancer name
// on the pod before DNS pods are available and before the service network is available. The side effect is that
// the CVO cannot resolve service DNS names.
func (p *PromQL) Address(ctx context.Context) (string, error) {
svc, err := p.kubeClient.CoreV1().Services("openshift-monitoring").Get(ctx, "thanos-querier", metav1.GetOptions{})
if err != nil {
return "", err
}

return fmt.Sprintf("https://%s", net.JoinHostPort(svc.Spec.ClusterIP, "9091")), nil
}

// Valid returns an error if the condition contains any properties
Expand All @@ -69,7 +87,13 @@ func (p *PromQL) Valid(ctx context.Context, condition *configv1.ClusterCondition
// false when the PromQL evaluates to 0, and an error if the PromQL
// returns no time series or returns a value besides 0 or 1.
func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) {
clientConfig := api.Config{Address: p.Address}
// Lookup the address every attempt in case the service IP changes. This can happen when the thanos service is
// deleted and recreated.
address, err := p.Address(ctx)
if err != nil {
return false, fmt.Errorf("failure determine thanos IP: %w", err)
}
clientConfig := api.Config{Address: address}

if roundTripper, err := config.NewRoundTripperFromConfig(p.HTTPClientConfig, "cluster-conditions"); err == nil {
clientConfig.RoundTripper = roundTripper
Expand Down Expand Up @@ -122,7 +146,3 @@ func (p *PromQL) Match(ctx context.Context, condition *configv1.ClusterCondition
}
return false, fmt.Errorf("invalid PromQL result (must be 0 or 1): %v", sample.Value)
}

func init() {
clusterconditions.Register("PromQL", promql)
}
16 changes: 16 additions & 0 deletions pkg/clusterconditions/standard/standard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package standard

import (
"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/promql"
"k8s.io/client-go/kubernetes"
)

func NewConditionRegistry(kubeClient kubernetes.Interface) clusterconditions.ConditionRegistry {
conditionRegistry := clusterconditions.NewConditionRegistry()
conditionRegistry.Register("Always", &always.Always{})
conditionRegistry.Register("PromQL", promql.NewPromQL(kubeClient))

return conditionRegistry
}
Loading

0 comments on commit f2620f6

Please sign in to comment.