Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement logging telemetry log 1926 #1264

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ test-e2e-local: $(JUNITREPORT) deploy-image
IMAGE_CLUSTER_LOGGING_OPERATOR=image-registry.openshift-image-registry.svc:5000/openshift/origin-cluster-logging-operator:$(CURRENT_BRANCH) \
IMAGE_CLUSTER_LOGGING_OPERATOR_REGISTRY=image-registry.openshift-image-registry.svc:5000/openshift/cluster-logging-operator-registry:$(CURRENT_BRANCH) \
hack/test-e2e-olm.sh

test-e2e-clo-metric:
test/e2e/telemetry/clometrics_test.sh
test-svt:
hack/svt/test-svt.sh

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Prometheus Monitor Service (Metrics)
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
control-plane: controller-manager
control-plane: cluster-logging-operator
name: cluster-logging-operator-metrics-monitor
spec:
endpoints:
- path: /metrics
targetPort: 8686
- port: http-metrics
namespaceSelector: {}
selector:
matchLabels:
control-plane: cluster-logging-operator
name: cluster-logging-operator
11 changes: 4 additions & 7 deletions config/prometheus/monitor.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@

# Prometheus Monitor Service (Metrics)
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
control-plane: controller-manager
control-plane: cluster-logging-operator
name: cluster-logging-operator-metrics-monitor
namespace: openshift-logging
spec:
endpoints:
- path: /metrics
targetPort: 8686
port: 8686
- port: http-metrics
namespaceSelector: {}
selector:
matchLabels:
control-plane: cluster-logging-operator
name: cluster-logging-operator
4 changes: 2 additions & 2 deletions controllers/clusterlogging/clusterlogging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package clusterlogging

import (
"context"
ctrl "sigs.k8s.io/controller-runtime"
"time"

ctrl "sigs.k8s.io/controller-runtime"

"github.com/ViaQ/logerr/log"
loggingv1 "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/k8shandler"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
Expand Down
131 changes: 123 additions & 8 deletions internal/k8shandler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"errors"
"fmt"
"github.com/openshift/cluster-logging-operator/internal/metrics"
"strconv"

"github.com/ViaQ/logerr/log"
configv1 "github.com/openshift/api/config/v1"
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/status"
"github.com/openshift/cluster-logging-operator/internal/telemetry"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -39,17 +41,17 @@ func Reconcile(requestCluster *logging.ClusterLogging, requestClient client.Clie
if clusterLoggingRequest.IncludesManagedStorage() {
// Reconcile certs
if err = clusterLoggingRequest.CreateOrUpdateCertificates(); err != nil {
return fmt.Errorf("Unable to create or update certificates for %q: %v", clusterLoggingRequest.Cluster.Name, err)
return fmt.Errorf("unable to create or update certificates for %q: %v", clusterLoggingRequest.Cluster.Name, err)
}

// Reconcile Log Store
if err = clusterLoggingRequest.CreateOrUpdateLogStore(); err != nil {
return fmt.Errorf("Unable to create or update logstore for %q: %v", clusterLoggingRequest.Cluster.Name, err)
return fmt.Errorf("unable to create or update logstore for %q: %v", clusterLoggingRequest.Cluster.Name, err)
}

// Reconcile Visualization
if err = clusterLoggingRequest.CreateOrUpdateVisualization(); err != nil {
return fmt.Errorf("Unable to create or update visualization for %q: %v", clusterLoggingRequest.Cluster.Name, err)
return fmt.Errorf("unable to create or update visualization for %q: %v", clusterLoggingRequest.Cluster.Name, err)
}

} else {
Expand All @@ -58,7 +60,7 @@ func Reconcile(requestCluster *logging.ClusterLogging, requestClient client.Clie

// Remove Curator
if err := clusterLoggingRequest.removeCurator(); err != nil {
log.V(1).Error(err, "Error removing curator component")
log.V(0).Error(err, "Error removing curator component")
}
clusterLoggingRequest.Cluster.Status.Conditions.SetCondition(status.Condition{
Type: "CuratorRemoved",
Expand All @@ -69,22 +71,39 @@ func Reconcile(requestCluster *logging.ClusterLogging, requestClient client.Clie

// Reconcile Collection
if err = clusterLoggingRequest.CreateOrUpdateCollection(); err != nil {
return fmt.Errorf("Unable to create or update collection for %q: %v", clusterLoggingRequest.Cluster.Name, err)
telemetry.Data.CollectorErrorCount.Inc("CollectorErrorCount")
return fmt.Errorf("unable to create or update collection for %q: %v", clusterLoggingRequest.Cluster.Name, err)
}

// Reconcile metrics Dashboards
if err = metrics.ReconcileDashboards(clusterLoggingRequest.Client); err != nil {
return fmt.Errorf("Unable to create or update metrics dashboards for %q: %w", clusterLoggingRequest.Cluster.Name, err)
}

///////

if clusterLoggingRequest.IncludesManagedStorage() {
updateCLInfo := UpdateInfofromCL(&clusterLoggingRequest)
if updateCLInfo != nil {
log.V(1).Info("Error in updating clusterlogging info for updating metrics", "updateCLInfo", updateCLInfo)
}
erru := telemetry.UpdateMetrics()
if erru != nil {
log.V(1).Error(erru, "Error in updating clo metrics for telemetry")
}
}
///////
//if it reaches till this point without any errors than mark CL in healthy state
telemetry.Data.CLInfo.Set("healthStatus", "0")

return nil
}

func removeManagedStorage(clusterRequest ClusterLoggingRequest) {
log.V(1).Info("Removing managed store components...")
log.V(0).Info("Removing managed store components...")
for _, remove := range []func() error{clusterRequest.removeElasticsearch, clusterRequest.removeKibana} {
if err := remove(); err != nil {
log.V(1).Error(err, "Error removing component")
log.V(0).Error(err, "Error removing component")
}
}
}
Expand All @@ -111,11 +130,26 @@ func ReconcileForClusterLogForwarder(forwarder *logging.ClusterLogForwarder, req
// Reconcile Collection
err = clusterLoggingRequest.CreateOrUpdateCollection()
forwarder.Status = clusterLoggingRequest.ForwarderRequest.Status

if err != nil {
msg := fmt.Sprintf("Unable to reconcile collection for %q: %v", clusterLoggingRequest.Cluster.Name, err)
log.Error(err, msg)
return errors.New(msg)
}

// if it reaches to this point without throwing any errors than mark CLF in healthy state
///////
updateCLFInfo := UpdateInfofromCLF(&clusterLoggingRequest)
if updateCLFInfo != nil {
log.V(1).Info("Error in updating CLF Info for CLF specific metrics", "updateCLFInfo", updateCLFInfo)
}
erru := telemetry.UpdateMetrics()
if erru != nil {
log.V(0).Error(erru, "Error in updating clo metrics for telemetry")
}
///////
telemetry.Data.CLFInfo.Set("healthStatus", "0")

return nil
}

Expand Down Expand Up @@ -144,7 +178,7 @@ func ReconcileForGlobalProxy(proxyConfig *configv1.Proxy, requestClient client.C

// Reconcile Collection
if err = clusterLoggingRequest.CreateOrUpdateCollection(); err != nil {
return fmt.Errorf("Unable to create or update collection for %q: %v", clusterLoggingRequest.Cluster.Name, err)
return fmt.Errorf("unable to create or update collection for %q: %v", clusterLoggingRequest.Cluster.Name, err)
}

return nil
Expand Down Expand Up @@ -200,3 +234,84 @@ func (clusterRequest *ClusterLoggingRequest) getLogForwarder() *logging.ClusterL

return forwarder
}

func UpdateInfofromCL(request *ClusterLoggingRequest) (err error) {

//CLO got two custom resources CL, CFL, CLF here is meant for forwarding logs to third party systems
//Here we update CL configuration parameters
clspec := request.Cluster.Spec

//default LogStore is set to be internal elasticsearch cluster running within OCP
if clspec.LogStore != nil {
log.V(1).Info("LogStore Type", "clspecLogStoreType", clspec.LogStore.Type)
if clspec.LogStore.Type == "elasticsearch" {
telemetry.Data.CLOutputType.Set("elasticsearch", "1")
} else {
telemetry.Data.CLOutputType.Set("elasticsearch", "0")
}
}

if request.Cluster.Spec.ManagementState == logging.ManagementStateManaged || request.Cluster.Spec.ManagementState == "" {
log.V(1).Info("managedStatus : Managed")
telemetry.Data.CLInfo.Set("managedStatus", "1") //Managed state indicator
} else {
log.V(1).Info("managedStatus : Unmanaged")
telemetry.Data.CLInfo.Set("managedStatus", "0") //Unmanaged state indicator
}

return nil
}

func UpdateInfofromCLF(request *ClusterLoggingRequest) (err error) {

//Here we update CLF spec parameters

var npipelines = 0
var output *logging.OutputSpec
var found bool

//CLO got two custom resources CL, CFL, CLF here is meant for forwarding logs to third party systems

//CLO CLF pipelines and set of output specs
lgpipeline := request.ForwarderSpec.Pipelines
outputs := request.ForwarderSpec.OutputMap()
log.V(1).Info("OutputMap", "outputs", outputs)

for _, pipeline := range lgpipeline {
npipelines++
log.V(1).Info("pipelines", "npipelines", npipelines)
inref := pipeline.InputRefs
outref := pipeline.OutputRefs

for labelname := range telemetry.Data.CLFInputType.M {
log.V(1).Info("iter over labelnames", "labelname", labelname)
telemetry.Data.CLFInputType.Set(labelname, "0") //reset to zero
for _, inputtype := range inref {
log.V(1).Info("iter over inputtype", "inputtype", inputtype)
if inputtype == labelname {
log.V(1).Info("labelname and inputtype", "labelname", labelname, "inputtype", inputtype) //when matched print matched labelname with input type stated in CLF spec
telemetry.Data.CLFInputType.Set(labelname, "1") //input type present in CLF spec
}
}
}

for labelname := range telemetry.Data.CLFOutputType.M {
log.V(1).Info("iter over labelnames", "labelname", labelname)
telemetry.Data.CLFOutputType.Set(labelname, "0") //reset to zero
for _, outputname := range outref {
log.V(1).Info("iter over outref", "outputname", outputname)
output, found = outputs[outputname]
if found {
outputtype := output.Type
if outputtype == labelname {
log.V(1).Info("labelname and outputtype", "labelname", labelname, "outputtype", outputtype)
telemetry.Data.CLFOutputType.Set(labelname, "1") //when matched print matched labelname with output type stated in CLF spec
}
}
}
}
log.V(1).Info("post updating inputtype and outputtype")
telemetry.Data.CLFInfo.Set("pipelineInfo", strconv.Itoa(npipelines))
}
return nil
}