Skip to content

Commit

Permalink
LOG-3527: Collector to act as rsyslog server
Browse files Browse the repository at this point in the history
  • Loading branch information
jlarriba committed Nov 29, 2023
1 parent e1612dc commit 34a5485
Show file tree
Hide file tree
Showing 28 changed files with 425 additions and 92 deletions.
22 changes: 15 additions & 7 deletions apis/logging/v1/cluster_log_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"reflect"
"strings"

"github.com/openshift/cluster-logging-operator/internal/constants"
"github.com/openshift/cluster-logging-operator/internal/utils/sets"
)

Expand Down Expand Up @@ -196,10 +195,19 @@ func (output *OutputSpec) GetMaxRecordsPerSecond() int64 {
return output.Limit.MaxRecordsPerSecond
}

func (receiver HTTPReceiver) GetPort() (ret int32) {
ret = constants.HTTPReceiverPort
if receiver.Port != 0 {
ret = receiver.Port
}
return
func IsAuditHttpReceiver(input *InputSpec) (ret bool) {
return input.Receiver != nil &&
input.Receiver.HTTP != nil &&
input.Receiver.Type == ReceiverTypeHttp &&
input.Receiver.HTTP.Format == FormatKubeAPIAudit
}

func IsHttpReceiver(input *InputSpec) (ret bool) {
return input.Receiver != nil &&
input.Receiver.Type == ReceiverTypeHttp
}

func IsSyslogReceiver(input *InputSpec) (ret bool) {
return input.Receiver != nil &&
input.Receiver.Type == ReceiverTypeSyslog
}
41 changes: 36 additions & 5 deletions apis/logging/v1/input_receiver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,37 @@ package v1

// NOTE: The Enum validation on ReceiverSpec.Type must be updated if the list of types changes.

// Receiver type constants, must match JSON tags of OutputTypeSpec fields.
const (
ReceiverTypeHttp = "http"
ReceiverTypeSyslog = "syslog"

FormatKubeAPIAudit = "kubeAPIAudit" // Log events in k8s list format, e.g. API audit log events.
)

// ReceiverSpec is a union of input Receiver types.
//
// The fields of this struct define the set of known Receiver types.
type ReceiverSpec struct {
HTTP *HTTPReceiver `json:"http,omitempty"`

// Type of Receiver plugin.
//
// +kubebuilder:validation:Enum:=http;syslog
// +required
Type string `json:"type"`

// The ReceiverTypeSpec that handles particular parameters
*ReceiverTypeSpec `json:",inline"`
}

const (
FormatKubeAPIAudit = "kubeAPIAudit" // Log events in k8s list format, e.g. API audit log events.
)
type ReceiverTypeSpec struct {
HTTP *HTTPReceiver `json:"http,omitempty"`
Syslog *SyslogReceiver `json:"syslog,omitempty"`
}

// HTTPReceiver receives encoded logs as a HTTP endpoint.
type HTTPReceiver struct {
// Port the Service and the HTTP listener listen on.
// Port the Receiver listens on.
// +kubebuilder:default:=8443
// +optional
Port int32 `json:"port"`
Expand All @@ -26,3 +43,17 @@ type HTTPReceiver struct {
// +required
Format string `json:"format"`
}

// SyslogReceiver receives logs from rsyslog
type SyslogReceiver struct {
// Port the Receiver listens on.
// +kubebuilder:default:=10514
// +optional
Port int32 `json:"port"`

// The protocol of the connection the Receiver will listen on: tcp or upd
// +kubebuilder:validation:Enum=tcp;udp
// +kubebuilder:default:=tcp
// +optional
Protocol string `json:"protocol"`
}
46 changes: 43 additions & 3 deletions apis/logging/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bundle/manifests/clusterlogging.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ metadata:
certified: "false"
console.openshift.io/plugins: '["logging-view-plugin"]'
containerImage: quay.io/openshift-logging/cluster-logging-operator:latest
createdAt: "2023-10-30T13:13:37Z"
createdAt: "2023-11-29T10:22:58Z"
description: The Red Hat OpenShift Logging Operator for OCP provides a means for
configuring and managing your aggregated logging stack.
olm.skipRange: '>=5.7.0-0 <5.9.0'
Expand Down
28 changes: 26 additions & 2 deletions bundle/manifests/logging.openshift.io_clusterlogforwarders.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,37 @@ spec:
type: string
port:
default: 8443
description: Port the Service and the HTTP listener
listen on.
description: Port the Receiver listens on.
format: int32
type: integer
required:
- format
type: object
syslog:
description: SyslogReceiver receives logs from rsyslog
properties:
port:
default: 10514
description: Port the Receiver listens on.
format: int32
type: integer
protocol:
default: tcp
description: 'The protocol of the connection the Receiver
will listen on: tcp or upd'
enum:
- tcp
- udp
type: string
type: object
type:
description: Type of Receiver plugin.
enum:
- http
- syslog
type: string
required:
- type
type: object
required:
- name
Expand Down
28 changes: 26 additions & 2 deletions config/crd/bases/logging.openshift.io_clusterlogforwarders.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,37 @@ spec:
type: string
port:
default: 8443
description: Port the Service and the HTTP listener
listen on.
description: Port the Receiver listens on.
format: int32
type: integer
required:
- format
type: object
syslog:
description: SyslogReceiver receives logs from rsyslog
properties:
port:
default: 10514
description: Port the Receiver listens on.
format: int32
type: integer
protocol:
default: tcp
description: 'The protocol of the connection the Receiver
will listen on: tcp or upd'
enum:
- tcp
- udp
type: string
type: object
type:
description: Type of Receiver plugin.
enum:
- http
- syslog
type: string
required:
- type
type: object
required:
- name
Expand Down
32 changes: 19 additions & 13 deletions internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
logKubeapiserver = "varlogkubeapiserver"
logKubeapiserverValue = "/var/log/kube-apiserver"
metricsVolumePath = "/etc/collector/metrics"
httpInputVolumePath = "/etc/collector/"
receiverInputVolumePath = "/etc/collector/receiver"
tmpVolumeName = "tmp"
tmpPath = "/tmp"
)
Expand Down Expand Up @@ -121,13 +121,13 @@ func New(confHash, clusterID string, collectorSpec logging.CollectionSpec, secre
return factory
}

func (f *Factory) NewDaemonSet(namespace, name string, trustedCABundle *v1.ConfigMap, tlsProfileSpec configv1.TLSProfileSpec, httpInputs []string) *apps.DaemonSet {
podSpec := f.NewPodSpec(trustedCABundle, f.ForwarderSpec, f.ClusterID, f.TrustedCAHash, tlsProfileSpec, httpInputs, namespace)
func (f *Factory) NewDaemonSet(namespace, name string, trustedCABundle *v1.ConfigMap, tlsProfileSpec configv1.TLSProfileSpec, receiverInputs []string) *apps.DaemonSet {
podSpec := f.NewPodSpec(trustedCABundle, f.ForwarderSpec, f.ClusterID, f.TrustedCAHash, tlsProfileSpec, receiverInputs, namespace)
ds := factory.NewDaemonSet(name, namespace, f.ResourceNames.CommonName, constants.CollectorName, string(f.CollectorSpec.Type), *podSpec, f.CommonLabelInitializer, f.PodLabelVisitor)
return ds
}

func (f *Factory) NewPodSpec(trustedCABundle *v1.ConfigMap, forwarderSpec logging.ClusterLogForwarderSpec, clusterID, trustedCAHash string, tlsProfileSpec configv1.TLSProfileSpec, httpInputs []string, namespace string) *v1.PodSpec {
func (f *Factory) NewPodSpec(trustedCABundle *v1.ConfigMap, forwarderSpec logging.ClusterLogForwarderSpec, clusterID, trustedCAHash string, tlsProfileSpec configv1.TLSProfileSpec, receiverInputs []string, namespace string) *v1.PodSpec {

podSpec := &v1.PodSpec{
NodeSelector: utils.EnsureLinuxNodeSelector(f.NodeSelector()),
Expand All @@ -149,17 +149,17 @@ func (f *Factory) NewPodSpec(trustedCABundle *v1.ConfigMap, forwarderSpec loggin
{Name: tmpVolumeName, VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{Medium: v1.StorageMediumMemory}}},
},
}
for _, httpInput := range httpInputs {
for _, receiverInput := range receiverInputs {
podSpec.Volumes = append(podSpec.Volumes,
v1.Volume{Name: httpInput, VolumeSource: v1.VolumeSource{Secret: &v1.SecretVolumeSource{SecretName: httpInput}}},
v1.Volume{Name: receiverInput, VolumeSource: v1.VolumeSource{Secret: &v1.SecretVolumeSource{SecretName: receiverInput}}},
)
}

podSpec.Tolerations = append(podSpec.Tolerations, f.Tolerations()...)

secretNames := AddSecretVolumes(podSpec, forwarderSpec)

collector := f.NewCollectorContainer(secretNames, clusterID, httpInputs)
collector := f.NewCollectorContainer(secretNames, clusterID, receiverInputs)

addTrustedCABundle(collector, podSpec, trustedCABundle, f.ResourceNames.CaTrustBundle)

Expand All @@ -176,7 +176,7 @@ func (f *Factory) NewPodSpec(trustedCABundle *v1.ConfigMap, forwarderSpec loggin

// NewCollectorContainer is a constructor for creating the collector container spec. Note the secretNames are assumed
// to be a unique list
func (f *Factory) NewCollectorContainer(secretNames []string, clusterID string, httpInputs []string) *v1.Container {
func (f *Factory) NewCollectorContainer(secretNames []string, clusterID string, receiverInputs []string) *v1.Container {

collector := factory.NewContainer(constants.CollectorName, f.ImageName, v1.PullIfNotPresent, f.CollectorResourceRequirements())
collector.Ports = []v1.ContainerPort{
Expand Down Expand Up @@ -210,9 +210,9 @@ func (f *Factory) NewCollectorContainer(secretNames []string, clusterID string,
{Name: f.ResourceNames.SecretMetrics, ReadOnly: true, MountPath: metricsVolumePath},
{Name: tmpVolumeName, MountPath: tmpPath},
}
for _, httpInput := range httpInputs {
for _, receiverInput := range receiverInputs {
collector.VolumeMounts = append(collector.VolumeMounts,
v1.VolumeMount{Name: httpInput, ReadOnly: true, MountPath: httpInputVolumePath + httpInput},
v1.VolumeMount{Name: receiverInput, ReadOnly: true, MountPath: path.Join(receiverInputVolumePath, receiverInput)},
)
}

Expand All @@ -229,9 +229,15 @@ func (f *Factory) ReconcileInputServices(er record.EventRecorder, k8sClient clie
}

for _, input := range f.ForwarderSpec.Inputs {
if input.Receiver != nil && input.Receiver.HTTP != nil {
listenPort := input.Receiver.HTTP.GetPort()
if err := network.ReconcileInputService(er, k8sClient, namespace, input.Name, selectorComponent, input.Name, listenPort, listenPort, owner, visitors); err != nil {
var listenPort int32
if input.Receiver != nil && input.Receiver.ReceiverTypeSpec != nil {
if logging.IsHttpReceiver(&input) {
listenPort = input.Receiver.HTTP.Port
}
if logging.IsSyslogReceiver(&input) {
listenPort = input.Receiver.Syslog.Port
}
if err := network.ReconcileInputService(er, k8sClient, namespace, input.Name, selectorComponent, input.Name, listenPort, listenPort, input.Receiver.Type, owner, visitors); err != nil {
return err
}
}
Expand Down
9 changes: 5 additions & 4 deletions internal/collector/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

log "github.com/ViaQ/logerr/v2/log/static"
logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/internal/reconcile"
"github.com/openshift/cluster-logging-operator/internal/runtime"
"github.com/openshift/cluster-logging-operator/internal/tls"
Expand All @@ -21,14 +22,14 @@ func (f *Factory) ReconcileDaemonset(er record.EventRecorder, k8sClient client.C
f.TrustedCAHash = trustHash
tlsProfile, _ := tls.FetchAPIServerTlsProfile(k8sClient)

var httpInputs []string
var receiverInputs []string
for _, input := range f.ForwarderSpec.Inputs {
if input.Receiver != nil && input.Receiver.HTTP != nil {
httpInputs = append(httpInputs, input.Name)
if logging.IsHttpReceiver(&input) || logging.IsSyslogReceiver(&input) {
receiverInputs = append(receiverInputs, input.Name)
}
}

desired := f.NewDaemonSet(namespace, f.ResourceNames.DaemonSetName(), trustedCABundle, tls.GetClusterTLSProfileSpec(tlsProfile), httpInputs)
desired := f.NewDaemonSet(namespace, f.ResourceNames.DaemonSetName(), trustedCABundle, tls.GetClusterTLSProfileSpec(tlsProfile), receiverInputs)
utils.AddOwnerRefToObject(desired, owner)
return reconcile.DaemonSet(er, k8sClient, desired)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ const (
OptimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
OTELSchema = "opentelemetry"

HTTPReceiverPort = 8443
LabelHTTPInputService = "http-input-service"
LabelHTTPInputService = "http-input-service"
LabelSyslogInputService = "syslog-input-service"
)

var ReconcileForGlobalProxyList = []string{CollectorTrustedCAName}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package fluentdforward

import (
"github.com/openshift/cluster-logging-operator/internal/generator/helpers/security"
"github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/generator/helpers/security"
"strings"

logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
Expand Down
2 changes: 1 addition & 1 deletion internal/generator/fluentd/output/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package loki

import (
"fmt"
"github.com/openshift/cluster-logging-operator/internal/generator/helpers/security"
. "github.com/openshift/cluster-logging-operator/internal/generator/framework"
"github.com/openshift/cluster-logging-operator/internal/generator/helpers/security"
"strings"

logging "github.com/openshift/cluster-logging-operator/apis/logging/v1"
Expand Down

0 comments on commit 34a5485

Please sign in to comment.