diff --git a/pkg/util/test/util.go b/pkg/util/test/util.go index 6b1d23a40..ce51e8ef2 100644 --- a/pkg/util/test/util.go +++ b/pkg/util/test/util.go @@ -3,9 +3,11 @@ package test import ( "context" "crypto/rand" + cryptoTls "crypto/tls" "encoding/base64" "fmt" "io" + "net" "net/http" "net/url" "os" @@ -19,8 +21,21 @@ import ( "github.com/golang/mock/gomock" "github.com/hashicorp/go-version" + "github.com/libopenstorage/openstorage/api" + "github.com/libopenstorage/operator/pkg/apis" + corev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" + "github.com/libopenstorage/operator/pkg/mock" + "github.com/libopenstorage/operator/pkg/util" ocp_configv1 "github.com/openshift/api/config/v1" consolev1 "github.com/openshift/api/console/v1" + ocp_secv1 "github.com/openshift/api/security/v1" + appops "github.com/portworx/sched-ops/k8s/apps" + coreops "github.com/portworx/sched-ops/k8s/core" + k8serrors "github.com/portworx/sched-ops/k8s/errors" + operatorops "github.com/portworx/sched-ops/k8s/operator" + prometheusops "github.com/portworx/sched-ops/k8s/prometheus" + rbacops "github.com/portworx/sched-ops/k8s/rbac" + "github.com/portworx/sched-ops/task" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -45,20 +60,6 @@ import ( cluster_v1alpha1 "sigs.k8s.io/cluster-api/pkg/apis/deprecated/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - - "github.com/libopenstorage/openstorage/api" - "github.com/libopenstorage/operator/pkg/apis" - corev1 "github.com/libopenstorage/operator/pkg/apis/core/v1" - "github.com/libopenstorage/operator/pkg/mock" - "github.com/libopenstorage/operator/pkg/util" - ocp_secv1 "github.com/openshift/api/security/v1" - appops "github.com/portworx/sched-ops/k8s/apps" - coreops "github.com/portworx/sched-ops/k8s/core" - k8serrors "github.com/portworx/sched-ops/k8s/errors" - operatorops "github.com/portworx/sched-ops/k8s/operator" - prometheusops "github.com/portworx/sched-ops/k8s/prometheus" - rbacops "github.com/portworx/sched-ops/k8s/rbac" - "github.com/portworx/sched-ops/task" ) const ( @@ -106,6 +107,32 @@ const ( pxAnnotationPrefix = "portworx.io" + // TelemetryCertName is name of the telemetry cert. + TelemetryCertName = "pure-telemetry-certs" + + // EnvKeyPortworxHTTPProxy env var to use http proxy + EnvKeyPortworxHTTPProxy = "PX_HTTP_PROXY" + // EnvKeyPortworxHTTPSProxy env var to use https proxy + EnvKeyPortworxHTTPSProxy = "PX_HTTPS_PROXY" + // HttpProtocolPrefix is the prefix for HTTP protocol + HttpProtocolPrefix = "http://" + // HttpsProtocolPrefix is the prefix for HTTPS protocol + HttpsProtocolPrefix = "https://" + + // AnnotationTelemetryArcusLocation annotation indicates the location (internal/external) of Arcus + // that CCM should use + AnnotationTelemetryArcusLocation = pxAnnotationPrefix + "/arcus-location" + + // Telemetry default params + productionArcusLocation = "external" + productionArcusRestProxyURL = "rest.cloud-support.purestorage.com" + productionArcusRegisterProxyURL = "register.cloud-support.purestorage.com" + stagingArcusLocation = "internal" + stagingArcusRestProxyURL = "rest.staging-cloud-support.purestorage.com" + stagingArcusRegisterProxyURL = "register.staging-cloud-support.purestorage.com" + arcusPingInterval = 6 * time.Second + arcusPingRetry = 5 + defaultTelemetrySecretValidationTimeout = 30 * time.Second defaultTelemetrySecretValidationInterval = time.Second @@ -124,9 +151,6 @@ const ( defaultDeleteStorageClusterTimeout = 3 * time.Minute defaultDeleteStorageClusterInterval = 10 * time.Second - defaultValidateStorageClusterOnlineTimeout = 30 * time.Minute - defaultValidateStorageClusterOnlineInterval = 1 * time.Minute - defaultRunCmdInPxPodTimeout = 25 * time.Second defaultRunCmdInPxPodInterval = 5 * time.Second ) @@ -136,12 +160,17 @@ const ( var TestSpecPath = "testspec" var ( - pxVer2_12, _ = version.NewVersion("2.12.0-") opVer1_9_1, _ = version.NewVersion("1.9.1-") opVer1_10, _ = version.NewVersion("1.10.0-") opVer23_3, _ = version.NewVersion("23.3.0-") opVer23_5, _ = version.NewVersion("23.5.0-") + opVer23_7, _ = version.NewVersion("23.7.0-") minOpVersionForKubeSchedConfig, _ = version.NewVersion("1.10.2-") + + // minimumPxVersionCCMJAVA minimum PX version to install ccm-java + minimumPxVersionCCMJAVA, _ = version.NewVersion("2.8") + // minimumPxVersionCCMGO minimum PX version to install ccm-go + minimumPxVersionCCMGO, _ = version.NewVersion("2.12") ) // MockDriver creates a mock storage driver @@ -2886,7 +2915,7 @@ func ValidateMonitoring(pxImageList map[string]string, cluster *corev1.StorageCl // Increasing timeout for Telemetry components as they take quite long time to initialize defaultTelemetryRetryInterval := 30 * time.Second - defaultTelemetryTimeout := 10 * time.Minute + defaultTelemetryTimeout := 30 * time.Minute if err := ValidateTelemetry(pxImageList, cluster, defaultTelemetryTimeout, defaultTelemetryRetryInterval); err != nil { return err } @@ -3017,88 +3046,242 @@ func ValidateTelemetry(pxImageList map[string]string, cluster *corev1.StorageClu } logrus.Infof("PX Operator version: [%s]", opVersion.String()) - // Update Telemetry status for PX Operator 23.3+ - if opVersion.GreaterThanOrEqual(opVer23_3) { - newCluster, err := updateTelemetryStatus(pxImageList, cluster) - if err != nil { - return nil + if pxVersion.GreaterThanOrEqual(minimumPxVersionCCMGO) && opVersion.GreaterThanOrEqual(opVer1_10) { + if shouldTelemetryBeEnabled(cluster) { + return ValidateTelemetryV2Enabled(pxImageList, cluster, timeout, interval) + } + return ValidateTelemetryV2Disabled(cluster, timeout, interval) + } else { + if shouldTelemetryBeEnabled(cluster) { + return ValidateTelemetryV1Enabled(pxImageList, cluster, timeout, interval) } - cluster = newCluster + return ValidateTelemetryV1Disabled(cluster, timeout, interval) } +} + +// shouldTelemetryBeEnabled validates if Telemetry should be auto enabled/disabled by default +func shouldTelemetryBeEnabled(cluster *corev1.StorageCluster) bool { + logrus.Info("Checking if Telemetry should be enabled or disabled") + var shouldTelemetryBeEnabled bool + logrus.Info("Check PX and PX Operator versions to determine which Telemetry version to validate against..") + pxVersion := GetPortworxVersion(cluster) + logrus.Infof("PX Version: [%s]", pxVersion.String()) + opVersion, _ := GetPxOperatorVersion() + logrus.Infof("PX Operator version: [%s]", opVersion.String()) + + // Telemetry is disabled explicitly then leave it as is if cluster.Spec.Monitoring != nil && cluster.Spec.Monitoring.Telemetry != nil && - cluster.Spec.Monitoring.Telemetry.Enabled { - logrus.Info("Telemetry is enabled in StorageCluster") - if pxVersion.GreaterThanOrEqual(pxVer2_12) && opVersion.GreaterThanOrEqual(opVer1_10) { - if err := ValidateTelemetryV2Enabled(pxImageList, cluster, timeout, interval); err != nil { - return err - } - } else if err := ValidateTelemetryV1Enabled(pxImageList, cluster, timeout, interval); err != nil { - return err + !cluster.Spec.Monitoring.Telemetry.Enabled { + logrus.Debug("Telemetry is explicitly disabled in StorageCluster") + return false + } else { + logrus.Debug("Telemetry is explicitly enabled in StorageCluster") + shouldTelemetryBeEnabled = true + } + + // Telemetry is not supported in those cases, set to disabled + proxyType, proxy := GetPxProxyEnvVarValue(cluster) + if pxVersion.LessThan(minimumPxVersionCCMJAVA) { + // PX version is lower than 2.8 + logrus.Warnf("Telemetry is not supported on Portworx version: [%s]", pxVersion.String()) + return false + } else if !IsCCMGoSupported(pxVersion) { + // CCM Java case, PX version is between 2.8 and 2.12, we do not enabled Telemetry by default here, unless its already enabled in the spec + logrus.Warnf("Telemetry is Java based on Portworx version: [%s]", pxVersion.String()) + if shouldTelemetryBeEnabled { + logrus.Infof("Telemetry should be enabled") + return true + } + logrus.Infof("Telemetry should be disabled") + return false + } else if proxyType == EnvKeyPortworxHTTPProxy || proxyType == EnvKeyPortworxHTTPSProxy { + if proxyType == EnvKeyPortworxHTTPSProxy && opVersion.LessThan(opVer23_7) { + logrus.Warnf("Found [%s] env var. HTTPS proxy is only supported starting in PX Operator [23.7.0], Current PX Operator version is [%s]", EnvKeyPortworxHTTPSProxy, opVersion.String()) + logrus.Infof("Telemetry should be disabled") + return false } - if err := validateTelemetrySecret(cluster, defaultTelemetrySecretValidationTimeout, defaultTelemetrySecretValidationInterval, true); err != nil { - return err + // CCM Go is supported, but HTTP/HTTPS proxy cannot be split into host and port + if _, _, _, proxyFormatErr := ParsePxProxyURL(proxy); proxyFormatErr != nil { + logrus.Warnf("Telemetry is not supported with proxy in a format of: [%s] and should not be enabled", proxy) + return false } - return nil } - logrus.Info("Telemetry is disabled in StorageCluster") - if pxVersion.GreaterThanOrEqual(pxVer2_12) && opVersion.GreaterThanOrEqual(opVer1_10) { - return ValidateTelemetryV2Disabled(cluster, timeout, interval) + // Telemetry CCM Go is supported, at this point telemetry is enabled or can be enabled potentially + // If the secret exists, means telemetry was enabled before and registered already, enable telemetry directly + // If the telemetry secret doesn't exist, check if Arcus is reachable first before enabling telemetry: + // * If Arcus is not reachable, registration will fail anyway, or it's an air-gapped cluster, disable telemetry + // * If Arcus is reachable, enable telemetry by default + secret, err := coreops.Instance().GetSecret(TelemetryCertName, cluster.Namespace) + if err == nil { + logrus.Debugf("Found Telemetry secret [%s] in [%s] namespace, Telemetry was previously enabled", secret.Name, secret.Namespace) } - return ValidateTelemetryV1Disabled(cluster, timeout, interval) -} -// updateTelemetryStatus NOTE: This is a workaround to get Telemetry state again -// from StorageCluster after validating that PX is online and upgraded, because -// there are multiple condition for when it can be disabled/enabled after sometime -// 1) PX Operator 23.3+ enables Telemetry by default, if it can reach Pure1 endpont succcessfully -// 2) PX Operator 23.3+ will disable or not enabled Telemetry by default, if it cannot reach Pure1 endpoint, even if it was enabled by user -// 3) PX Operator will enable Telemetry after you upgrade to PX Operator 23.3+, If it is not disabled3) PX Operator will enable Telemetry after you upgrade to PX Operator 23.3+, If it is not disabled3) PX Operator will enable Telemetry after you upgrade to PX Operator 23.3+, If it is not disabled -func updateTelemetryStatus(pxImageList map[string]string, cluster *corev1.StorageCluster) (*corev1.StorageCluster, error) { - // Making sure cluster is online - liveCluster, err := ValidateStorageClusterIsOnline(cluster, defaultValidateStorageClusterOnlineTimeout, defaultValidateStorageClusterOnlineInterval) - if err != nil { - return nil, err + // If telemetry secret is not created yet, set telemetry as disabled if the registration endpoint is not reachable + // Only ping Arcus when telemetry secret is not found, otherwise the cluster was already registered before + if errors.IsNotFound(err) { + logrus.Debugf("Telemetry secret [%s] was not found, will try to reach to Pure1 to see if Telemetry should be auto enabled by default", TelemetryCertName) + if canAccess := CanAccessArcusRegisterEndpoint(cluster, proxy); !canAccess { + logrus.Warnf("Telemetry be disabled due to cannot reach to Pure1") + return false + } + logrus.Debug("Able to reach to Pure1") } - // Validate storagenodes are upgraded - if err := ValidateAllStorageNodesAreUpgraded(pxImageList, liveCluster); err != nil { - return nil, err - } + logrus.Infof("Telemetry should be enabled") + return true +} + +// IsCCMGoSupported returns true if px version is higher than 2.12 +func IsCCMGoSupported(pxVersion *version.Version) bool { + return pxVersion.GreaterThanOrEqual(minimumPxVersionCCMGO) +} - // Check Telemetry state before timeout - if liveCluster.Spec.Monitoring != nil && liveCluster.Spec.Monitoring.Telemetry != nil { - logrus.Debugf("Telemetry state in the StorageCluster [%s] before sleep is [%v]", liveCluster.Name, liveCluster.Spec.Monitoring.Telemetry.Enabled) +// ParsePxProxy trims protocol prefix then splits the proxy address of the form "host:port" with possible basic authentication credential +func ParsePxProxyURL(proxy string) (string, string, string, error) { + var authHeader string + + if strings.HasPrefix(proxy, HttpsProtocolPrefix) && strings.Contains(proxy, "@") { + proxyUrl, err := url.Parse(proxy) + if err != nil { + return "", "", "", fmt.Errorf("failed to parse px proxy url [%s]", proxy) + } + username := proxyUrl.User.Username() + password, _ := proxyUrl.User.Password() + encodedAuth := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) + authHeader = fmt.Sprintf("Basic %s", encodedAuth) + host, port, err := net.SplitHostPort(proxyUrl.Host) + if err != nil { + return "", "", "", err + } else if host == "" || port == "" || encodedAuth == "" { + return "", "", "", fmt.Errorf("failed to split px proxy to get host and port [%s]", proxy) + } + return host, port, authHeader, nil } else { - logrus.Debugf("Telemetry state in the StorageCluster [%s] before sleep is [nil]", liveCluster.Name) + proxy = strings.TrimPrefix(proxy, HttpProtocolPrefix) + proxy = strings.TrimPrefix(proxy, HttpsProtocolPrefix) // treat https proxy as http proxy if no credential provided + host, port, err := net.SplitHostPort(proxy) + if err != nil { + return "", "", "", err + } else if host == "" || port == "" { + return "", "", "", fmt.Errorf("failed to split px proxy to get host and port [%s]", proxy) + } + return host, port, authHeader, nil } +} - logrus.Debugf("Sleeping for 2 minutes to get new Telemetry state") - time.Sleep(2 * time.Minute) +// CanAccessArcusRegisterEndpoint checks if telemetry registration endpoint is reachable +// return true immediately if it can reach to Arcus +// return false after failing 5 times in a row +func CanAccessArcusRegisterEndpoint( + cluster *corev1.StorageCluster, + proxy string, +) bool { + endpoint := getArcusRegisterProxyURL(cluster) + logrus.Debugf("Checking whether telemetry registration endpoint [%s] is accessible on cluster [%s]...", + endpoint, cluster.Name) + + url, _ := url.Parse(fmt.Sprintf("https://%s:443/auth/1.0/ping", endpoint)) + request := &http.Request{ + Method: "GET", + URL: url, + Header: map[string][]string{ + // 3 headers are required here by the API, but can use dummy values here. + // cluster UUID can be empty, so not using it for appliance-id here. + "product-name": {"portworx"}, + "appliance-id": {"portworx"}, + "component-sn": {cluster.Name}, + }, + } - // Get StorageCluster - newCluster, err := operatorops.Instance().GetStorageCluster(liveCluster.Name, liveCluster.Namespace) - if err != nil { - return nil, err + client := &http.Client{} + if proxy != "" { + if strings.Contains(strings.ToLower(proxy), "@") { + if !strings.HasPrefix(strings.ToLower(proxy), "https://") { + proxy = "https://" + proxy + } + } else { + if !strings.HasPrefix(strings.ToLower(proxy), "http://") { + proxy = "http://" + proxy + } + } + proxyURL, err := url.Parse(proxy) + if err != nil { + logrus.Warnf("failed to parse proxy [%s] for checking Pure1 connectivity, Err: %v", proxy, err) + return false + } + client.Transport = &http.Transport{ + Proxy: http.ProxyURL(proxyURL), + TLSClientConfig: &cryptoTls.Config{InsecureSkipVerify: true}, + } } - // Update Telemetry status - if cluster.Spec.Monitoring != nil && cluster.Spec.Monitoring.Telemetry != nil { - cluster.Spec.Monitoring.Telemetry.Enabled = newCluster.Spec.Monitoring.Telemetry.Enabled - } else { - cluster.Spec.Monitoring = newCluster.Spec.Monitoring + for i := 1; i <= arcusPingRetry; i++ { + response, err := client.Do(request) + warnMsg := fmt.Sprintf("Failed to access telemetry registration endpoint [%s]", endpoint) + if err != nil { + logrus.WithError(err).Warnf(warnMsg) + } else if response.StatusCode != 200 { + // Only consider 200 as a successful ping with a properly constructed request. + body, _ := io.ReadAll(response.Body) + response.Body.Close() + logrus.WithFields(logrus.Fields{ + "code": response.StatusCode, + "body": string(body), + }).Warnf(warnMsg) + } else { + logrus.Infof("Telemetry registration endpoint [%s] is accessible on cluster [%s]", endpoint, cluster.Name) + return true + } + if i != arcusPingRetry { + logrus.Warnf("Failed to ping Pure1 [%s], retrying...", endpoint) + time.Sleep(arcusPingInterval) + } } + return false +} - // Check Telemetry state before timeout - if liveCluster.Spec.Monitoring != nil && liveCluster.Spec.Monitoring.Telemetry != nil { - logrus.Debugf("Telemetry state in the StorageCluster [%s] after sleep is [%v]", liveCluster.Name, liveCluster.Spec.Monitoring.Telemetry.Enabled) - } else { - logrus.Debugf("Telemetry state in the StorageCluster [%s] after sleep is [nil]", liveCluster.Name) +func getArcusTelemetryLocation(cluster *corev1.StorageCluster) string { + if cluster.Annotations[AnnotationTelemetryArcusLocation] != "" { + location := strings.ToLower(strings.TrimSpace(cluster.Annotations[AnnotationTelemetryArcusLocation])) + if location == stagingArcusLocation { + return location + } } + return productionArcusLocation +} - return cluster, nil +func getArcusRegisterProxyURL(cluster *corev1.StorageCluster) string { + if getArcusTelemetryLocation(cluster) == stagingArcusLocation { + return stagingArcusRegisterProxyURL + } + return productionArcusRegisterProxyURL +} + +// GetPxProxyEnvVarValue returns the PX_HTTP(S)_PROXY environment variable value for a cluster. +// Note: we only expect one proxy for the telemetry CCM container but we prefer https over http if both are specified +func GetPxProxyEnvVarValue(cluster *corev1.StorageCluster) (string, string) { + httpProxy := "" + for _, env := range cluster.Spec.Env { + key, val := env.Name, env.Value + if key == EnvKeyPortworxHTTPSProxy { + // If http proxy is specified in https env var, treat it as a http proxy endpoint + if strings.HasPrefix(val, "http://") { + logrus.Warnf("Using endpoint [%s] from environment variable [%s] as a http proxy endpoint instead", + val, EnvKeyPortworxHTTPSProxy) + return EnvKeyPortworxHTTPProxy, val + } + return EnvKeyPortworxHTTPSProxy, val + } else if key == EnvKeyPortworxHTTPProxy { + httpProxy = val + } + } + if httpProxy != "" { + return EnvKeyPortworxHTTPProxy, httpProxy + } + return "", "" } // ValidateAllStorageNodesAreUpgraded validates that all storagenodes are online and have expected PX version