Skip to content
This repository has been archived by the owner on Jul 10, 2020. It is now read-only.

Commit

Permalink
Add sink resources
Browse files Browse the repository at this point in the history
- Add sink resources config
- Add sink and event controllers
- Add tests for sink-resources CRDs
- Add test infra
- Add CONTRIBUTING guidelines for development
- Add e2e tests

Signed-off-by: Jason Keene <jkeene@pivotal.io>
  • Loading branch information
Warren Fernandes authored and Jason Keene committed Nov 14, 2018
1 parent 9f998fd commit e28485b
Show file tree
Hide file tree
Showing 97 changed files with 6,739 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[submodule "vendor/k8s.io/code-generator"]
path = vendor/k8s.io/code-generator
url = https://github.com/kubernetes/code-generator
branch = release-1.11
[submodule "vendor/github.com/knative/test-infra"]
path = vendor/github.com/knative/test-infra
url = https://github.com/knative/test-infra
16 changes: 16 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@


## Updating Dependencies

Make sure the repo exists in your `GOPATH` under:

```
$GOPATH/src/github.com/knative/observability
```

From here run:

```
GO111MODULE=on go mod vendor
git submodule update
```
112 changes: 112 additions & 0 deletions cmd/event-controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main

import (
"context"
_ "expvar"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

envstruct "code.cloudfoundry.org/go-envstruct"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/fluent/fluent-logger-golang/fluent"
"github.com/knative/observability/pkg/event"
)

type config struct {
Host string `env:"FORWARDER_HOST,required,report"`
MetricsPort string `env:"METRICS_PORT,report"`
}

func main() {
ctx := setupSignalHandler()

conf := config{
MetricsPort: "6060",
}
err := envstruct.Load(&conf)
if err != nil {
log.Fatal(err.Error())
}
err = envstruct.WriteReport(&conf)
if err != nil {
log.Fatal(err.Error())
}

go http.ListenAndServe(net.JoinHostPort("", conf.MetricsPort), http.DefaultServeMux)

cfg, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err.Error())
}

kclientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
log.Fatal(err.Error())
}

f, err := fluent.New(fluent.Config{
FluentHost: conf.Host,
})
if err != nil {
log.Fatalf("unable to create fluent logger client: %s", err)
}

controller := event.NewController(f)

informerFactory := informers.NewSharedInformerFactory(kclientset, 30*time.Second)

eventInformer := informerFactory.Core().V1().Events().Informer()
eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.AddFunc,
DeleteFunc: controller.DeleteFunc,
UpdateFunc: controller.UpdateFunc,
})

eventInformer.Run(ctx.Done())
}

var onlyOneSignalHandler = make(chan struct{})

// setupSignalHandler registers SIGTERM and SIGINT. A context is returned
// which is canceled on one of these signals. If a second signal is caught,
// the program is terminated with exit code 1.
func setupSignalHandler() context.Context {
close(onlyOneSignalHandler) // only call once, panic on calls > 1

ctx, cancel := context.WithCancel(context.Background())

c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
cancel()
<-c
os.Exit(1) // second signal. Exit directly.
}()

return ctx
}
123 changes: 123 additions & 0 deletions cmd/sink-controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main

import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"

envstruct "code.cloudfoundry.org/go-envstruct"
"github.com/knative/observability/pkg/client/clientset/versioned"
informers "github.com/knative/observability/pkg/client/informers/externalversions"
"github.com/knative/observability/pkg/sink"
coreV1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

type config struct {
Namespace string `env:"NAMESPACE,required,report"`
}

func main() {
flag.Parse()
ctx := setupSignalHandler()

var conf config
err := envstruct.Load(&conf)
if err != nil {
log.Fatal(err.Error())
}
err = envstruct.WriteReport(&conf)
if err != nil {
log.Fatal(err.Error())
}

cfg, err := rest.InClusterConfig()
if err != nil {
log.Fatal(err.Error())
}

client, err := versioned.NewForConfig(cfg)
if err != nil {
log.Fatal(err.Error())
}

coreV1Client, err := coreV1.NewForConfig(cfg)
if err != nil {
log.Fatal(err.Error())
}

sinkConfig := sink.NewConfig()

controller := sink.NewController(
coreV1Client.ConfigMaps(conf.Namespace),
coreV1Client.Pods(conf.Namespace),
sinkConfig,
)

clusterController := sink.NewClusterController(
coreV1Client.ConfigMaps(conf.Namespace),
coreV1Client.Pods(conf.Namespace),
sinkConfig,
)

sinkInformerFactory := informers.NewSharedInformerFactory(client, time.Second*30)

sinkInformer := sinkInformerFactory.Observability().V1alpha1().LogSinks().Informer()
sinkInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.AddFunc,
DeleteFunc: controller.DeleteFunc,
UpdateFunc: controller.UpdateFunc,
})

clusterSinkInformer := sinkInformerFactory.Observability().V1alpha1().ClusterLogSinks().Informer()
clusterSinkInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: clusterController.AddFunc,
DeleteFunc: clusterController.DeleteFunc,
UpdateFunc: clusterController.UpdateFunc,
})

go sinkInformer.Run(ctx.Done())
clusterSinkInformer.Run(ctx.Done())
}

var onlyOneSignalHandler = make(chan struct{})

// setupSignalHandler registers SIGTERM and SIGINT. A context is returned
// which is canceled on one of these signals. If a second signal is caught,
// the program is terminated with exit code 1.
func setupSignalHandler() context.Context {
close(onlyOneSignalHandler) // only call once, panic on calls > 1

ctx, cancel := context.WithCancel(context.Background())

c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
cancel()
<-c
os.Exit(1) // second signal. Exit directly.
}()

return ctx
}
76 changes: 76 additions & 0 deletions config/100-cluster-log-sink-crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: clusterlogsinks.observability.knative.dev
spec:
group: observability.knative.dev
version: v1alpha1
versions:
- name: v1alpha1
served: true
storage: true
scope: Cluster
names:
plural: clusterlogsinks
singular: clusterlogsink
kind: ClusterLogSink
validation:
openAPIV3Schema:
properties:
spec:
required:
- type
- port
- host
properties:
port:
type: integer
minimum: 0
maximum: 65535
type:
type: string
enum:
- syslog
host:
type: string
pattern: '^([a-zA-Z0-9-\.]+)$|^([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$|^([a-fA-F0-9\:]+)$'
enable_tls:
type: boolean
insecure_skip_verify:
type: boolean
additionalPrinterColumns:
- name: Type
JSONPath: .spec.type
type: string
- name: Host
JSONPath: .spec.host
type: string
- name: Port
JSONPath: .spec.port
type: integer
- name: TLS
JSONPath: .spec.enable_tls
type: boolean
- name: Insecure
JSONPath: .spec.insecure_skip_verify
type: boolean
description: |
Accept any certificate presented by the server and any host name in
that certificate.
- JSONPath: .metadata.creationTimestamp
name: Age
type: date

0 comments on commit e28485b

Please sign in to comment.