Skip to content

Commit

Permalink
Implemented Cron Job source
Browse files Browse the repository at this point in the history
Used Deployment as receive adapter instead of CronJob becasue of
the batched job side car issue.

kubernetes/kubernetes#25908

If this issue is resolved in the future, we can switch to use CronJob.
  • Loading branch information
whynowy committed Nov 30, 2018
1 parent 4338ed7 commit 7f62e9d
Show file tree
Hide file tree
Showing 41 changed files with 3,552 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions cmd/cronjob_receive_adapter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"log"
"os"

"github.com/knative/eventing-sources/pkg/adapter/cronjobevents"
"go.uber.org/zap"

"golang.org/x/net/context"
)

const (
// Environment variable container schedule.
envSchedue = "SCHEDULE"

// Environment variable containing data.
envData = "DATA"

// Sink for messages.
envSinkURI = "SINK_URI"
)

func getRequiredEnv(envKey string) string {
val, defined := os.LookupEnv(envKey)
if !defined {
log.Fatalf("required environment variable not defined '%s'", envKey)
}
return val
}

func main() {
flag.Parse()

ctx := context.Background()
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("Unable to create logger: %v", err)
}

adapter := &cronjobevents.Adapter{
Schedule: getRequiredEnv(envSchedue),
Data: getRequiredEnv(envData),
SinkURI: getRequiredEnv(envSinkURI),
}

logger.Info("Starting Receive Adapter. %v", zap.Reflect("adapter", adapter))
if err := adapter.Start(ctx); err != nil {
logger.Fatal("Failed to start adapter: ", zap.Error(err))
}
}
70 changes: 70 additions & 0 deletions config/crds/sources_v1alpha1_cronjobsource.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
labels:
controller-tools.k8s.io: "1.0"
name: cronjobsources.sources.eventing.knative.dev
spec:
group: sources.eventing.knative.dev
names:
categories:
- all
- knative
- eventing
- sources
kind: CronJobSource
plural: cronjobsources
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
schedule:
type: string
data:
type: string
serviceAccountName:
type: string
sink:
type: object
type: object
status:
properties:
conditions:
items:
properties:
lastTransitionTime:
type: object
message:
type: string
reason:
type: string
severity:
type: string
status:
type: string
type:
type: string
required:
- type
- status
type: object
type: array
sinkUri:
type: string
type: object
version: v1alpha1
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
85 changes: 85 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,77 @@ status:
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
labels:
controller-tools.k8s.io: "1.0"
name: cronjobsources.sources.eventing.knative.dev
spec:
group: sources.eventing.knative.dev
names:
categories:
- all
- knative
- eventing
- sources
kind: CronJobSource
plural: cronjobsources
scope: Namespaced
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
data:
type: string
schedule:
type: string
serviceAccountName:
type: string
sink:
type: object
type: object
status:
properties:
conditions:
items:
properties:
lastTransitionTime:
type: string
message:
type: string
reason:
type: string
severity:
type: string
status:
type: string
type:
type: string
required:
- type
- status
type: object
type: array
sinkUri:
type: string
type: object
version: v1alpha1
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
creationTimestamp: null
labels:
Expand Down Expand Up @@ -414,6 +485,18 @@ rules:
- update
- patch
- delete
- apiGroups:
- sources.eventing.knative.dev
resources:
- cronjobsources
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
Expand Down Expand Up @@ -482,6 +565,8 @@ spec:
value: github.com/knative/eventing-sources/cmd/kuberneteseventsource
- name: GH_RA_IMAGE
value: github.com/knative/eventing-sources/cmd/githubsource
- name: CRONJOB_RA_IMAGE
value: github.com/knative/eventing-sources/cmd/cronjob_receive_adapter
image: github.com/knative/eventing-sources/cmd/manager
name: manager
resources:
Expand Down
17 changes: 17 additions & 0 deletions config/default/cronjobsource_conditions_patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# this fixes the auto-detected object, we use a string in the stored object but
# a wrapper object for time at runtime.
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: cronjobsources.sources.eventing.knative.dev
spec:
validation:
openAPIV3Schema:
properties:
status:
properties:
conditions:
items:
properties:
lastTransitionTime:
type: string
2 changes: 2 additions & 0 deletions config/default/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ resources:
- ../crds/sources_v1alpha1_containersource.yaml
- ../crds/sources_v1alpha1_kuberneteseventsource.yaml
- ../crds/sources_v1alpha1_githubsource.yaml
- ../crds/sources_v1alpha1_cronjobsource.yaml
- ../rbac/rbac_role.yaml
- ../rbac/rbac_role_binding.yaml
- ../manager/manager.yaml
Expand All @@ -19,3 +20,4 @@ patches:
- containersource_conditions_patch.yaml
- kuberneteseventsource_conditions_patch.yaml
- githubsource_conditions_patch.yaml
- cronjobsource_conditions_patch.yaml
2 changes: 2 additions & 0 deletions config/default/manager_image_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ spec:
value: github.com/knative/eventing-sources/cmd/kuberneteseventsource
- name: GH_RA_IMAGE
value: github.com/knative/eventing-sources/cmd/githubsource
- name: CRONJOB_RA_IMAGE
value: github.com/knative/eventing-sources/cmd/cronjob_receive_adapter
12 changes: 12 additions & 0 deletions config/rbac/rbac_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ rules:
- update
- patch
- delete
- apiGroups:
- sources.eventing.knative.dev
resources:
- cronjobsources
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
resources:
Expand Down
Loading

0 comments on commit 7f62e9d

Please sign in to comment.